You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/12/14 04:38:02 UTC
[1/2] aurora git commit: Recover snapshots via the Op stream
Repository: aurora
Updated Branches:
refs/heads/master 4489dc345 -> 5f79f7ca7
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotService.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotService.java
new file mode 100644
index 0000000..b30de88
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotService.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import javax.inject.Inject;
+
+import com.google.common.util.concurrent.AbstractScheduledService;
+
+import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException;
+import org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Snapshotter;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A {@link SnapshotStore} that snapshots to the log, and automatically snapshots on
+ * a fixed interval.
+ */
+class SnapshotService extends AbstractScheduledService implements SnapshotStore {
+ private static final Logger LOG = LoggerFactory.getLogger(SnapshotService.class);
+
+ private final Storage storage;
+ private final LogPersistence log;
+ private final Snapshotter snapshotter;
+ private final Amount<Long, Time> snapshotInterval;
+
+ @Inject
+ SnapshotService(Storage storage, LogPersistence log, Snapshotter snapshotter, Settings settings) {
+ this.storage = requireNonNull(storage);
+ this.log = requireNonNull(log);
+ this.snapshotter = requireNonNull(snapshotter);
+ this.snapshotInterval = settings.getSnapshotInterval();
+ }
+
+ @Override
+ protected void runOneIteration() {
+ snapshot();
+ }
+
+ @Timed("scheduler_log_snapshot")
+ @Override
+ public void snapshot() throws StorageException {
+ try {
+ LOG.info("Creating snapshot");
+
+ // It's important to perform snapshot creation in a write lock to ensure all upstream callers
+ // are correctly synchronized (e.g. during backup creation).
+ storage.write((NoResult.Quiet) stores -> {
+ Snapshot snapshot = snapshotter.from(stores);
+ LOG.info("Saving snapshot");
+ snapshotWith(snapshot);
+
+ LOG.info("Snapshot complete."
+ + " host attrs: " + snapshot.getHostAttributesSize()
+ + ", cron jobs: " + snapshot.getCronJobsSize()
+ + ", quota confs: " + snapshot.getQuotaConfigurationsSize()
+ + ", tasks: " + snapshot.getTasksSize()
+ + ", updates: " + snapshot.getJobUpdateDetailsSize());
+ });
+ } catch (CodingException e) {
+ throw new StorageException("Failed to encode a snapshot", e);
+ } catch (InvalidPositionException e) {
+ throw new StorageException("Saved snapshot but failed to truncate entries preceding it", e);
+ } catch (StreamAccessException e) {
+ throw new StorageException("Failed to create a snapshot", e);
+ }
+ }
+
+ @Timed("scheduler_log_snapshot_persist")
+ @Override
+ public void snapshotWith(Snapshot snapshot)
+ throws CodingException, InvalidPositionException, StreamAccessException {
+
+ log.persist(snapshot);
+ }
+
+ @Override
+ protected Scheduler scheduler() {
+ return Scheduler.newFixedDelaySchedule(
+ snapshotInterval.getValue(),
+ snapshotInterval.getValue(),
+ snapshotInterval.getUnit().getTimeUnit());
+ }
+
+ /**
+ * Configuration settings for log persistence.
+ */
+ public static class Settings {
+ private final Amount<Long, Time> snapshotInterval;
+
+ Settings(Amount<Long, Time> snapshotInterval) {
+ this.snapshotInterval = requireNonNull(snapshotInterval);
+ }
+
+ public Amount<Long, Time> getSnapshotInterval() {
+ return snapshotInterval;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
index 739fad7..5aefe5f 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java
@@ -13,10 +13,11 @@
*/
package org.apache.aurora.scheduler.storage.log;
-import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import javax.inject.Inject;
@@ -24,36 +25,35 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Streams;
import org.apache.aurora.common.inject.TimedInterceptor.Timed;
import org.apache.aurora.common.stats.SlidingStats;
import org.apache.aurora.common.stats.SlidingStats.Timeable;
import org.apache.aurora.common.util.BuildInfo;
import org.apache.aurora.common.util.Clock;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.JobInstanceUpdateEvent;
-import org.apache.aurora.gen.JobUpdateDetails;
-import org.apache.aurora.gen.JobUpdateEvent;
+import org.apache.aurora.gen.storage.Op;
import org.apache.aurora.gen.storage.QuotaConfiguration;
+import org.apache.aurora.gen.storage.SaveCronJob;
+import org.apache.aurora.gen.storage.SaveFrameworkId;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
+import org.apache.aurora.gen.storage.SaveJobUpdate;
+import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.gen.storage.SaveTasks;
import org.apache.aurora.gen.storage.SchedulerMetadata;
import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.gen.storage.StoredCronJob;
import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.Storage.Volatile;
-import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
+import org.apache.aurora.scheduler.storage.Snapshotter;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.slf4j.Logger;
@@ -65,7 +65,7 @@ import static java.util.Objects.requireNonNull;
* Snapshot store implementation that delegates to underlying snapshot stores by
* extracting/applying fields in a snapshot thrift struct.
*/
-public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
+public class SnapshotStoreImpl implements Snapshotter {
@VisibleForTesting
static final String SNAPSHOT_SAVE = "snapshot_save_";
@@ -83,63 +83,62 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
@VisibleForTesting
Set<String> snapshotFieldNames() {
- return FluentIterable.from(snapshotFields)
- .transform(SnapshotField::getName)
- .toSet();
+ return snapshotFields.stream()
+ .map(SnapshotField::getName)
+ .collect(Collectors.toSet());
}
- private final Iterable<SnapshotField> snapshotFields = Arrays.asList(
+ private final List<SnapshotField> snapshotFields = ImmutableList.of(
new SnapshotField() {
@Override
- public String getName() {
+ String getName() {
return HOST_ATTRIBUTES_FIELD;
}
@Override
- public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+ void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
snapshot.setHostAttributes(
IHostAttributes.toBuildersSet(store.getAttributeStore().getHostAttributes()));
}
@Override
- public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+ Stream<Op> doStreamFrom(Snapshot snapshot) {
if (snapshot.getHostAttributesSize() > 0) {
- store.getAttributeStore().deleteHostAttributes();
- for (HostAttributes attributes : snapshot.getHostAttributes()) {
- store.getAttributeStore().saveHostAttributes(IHostAttributes.build(attributes));
- }
+ return snapshot.getHostAttributes().stream()
+ .map(attributes -> Op.saveHostAttributes(
+ new SaveHostAttributes().setHostAttributes(attributes)));
}
+ return Stream.empty();
}
},
new SnapshotField() {
@Override
- public String getName() {
+ String getName() {
return TASK_FIELD;
}
@Override
- public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+ void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
snapshot.setTasks(
IScheduledTask.toBuildersSet(store.getTaskStore().fetchTasks(Query.unscoped())));
}
@Override
- public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+ Stream<Op> doStreamFrom(Snapshot snapshot) {
if (snapshot.getTasksSize() > 0) {
- store.getUnsafeTaskStore().deleteAllTasks();
- store.getUnsafeTaskStore()
- .saveTasks(thriftBackfill.backfillTasks(snapshot.getTasks()));
+ return Stream.of(Op.saveTasks(new SaveTasks().setTasks(snapshot.getTasks())));
}
+ return Stream.empty();
}
},
new SnapshotField() {
@Override
- public String getName() {
+ String getName() {
return CRON_FIELD;
}
@Override
- public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+ void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
ImmutableSet.Builder<StoredCronJob> jobs = ImmutableSet.builder();
for (IJobConfiguration config : store.getCronJobStore().fetchJobs()) {
@@ -149,46 +148,46 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
}
@Override
- public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+ Stream<Op> doStreamFrom(Snapshot snapshot) {
if (snapshot.getCronJobsSize() > 0) {
- store.getCronJobStore().deleteJobs();
- for (StoredCronJob job : snapshot.getCronJobs()) {
- store.getCronJobStore().saveAcceptedJob(
- thriftBackfill.backfillJobConfiguration(job.getJobConfiguration()));
- }
+ return snapshot.getCronJobs().stream()
+ .map(job -> Op.saveCronJob(
+ new SaveCronJob().setJobConfig(job.getJobConfiguration())));
}
+ return Stream.empty();
}
},
new SnapshotField() {
@Override
- public String getName() {
+ String getName() {
return SCHEDULER_METADATA_FIELD;
}
@Override
- public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+ void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
// SchedulerMetadata is updated outside of the static list of SnapshotFields
}
@Override
- public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+ Stream<Op> doStreamFrom(Snapshot snapshot) {
if (snapshot.isSetSchedulerMetadata()
&& snapshot.getSchedulerMetadata().isSetFrameworkId()) {
// No delete necessary here since this is a single value.
- store.getSchedulerStore()
- .saveFrameworkId(snapshot.getSchedulerMetadata().getFrameworkId());
+ return Stream.of(Op.saveFrameworkId(
+ new SaveFrameworkId().setId(snapshot.getSchedulerMetadata().getFrameworkId())));
}
+ return Stream.empty();
}
},
new SnapshotField() {
@Override
- public String getName() {
+ String getName() {
return QUOTA_FIELD;
}
@Override
- public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+ void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
ImmutableSet.Builder<QuotaConfiguration> quotas = ImmutableSet.builder();
for (Map.Entry<String, IResourceAggregate> entry
: store.getQuotaStore().fetchQuotas().entrySet()) {
@@ -200,24 +199,24 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
}
@Override
- public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+ Stream<Op> doStreamFrom(Snapshot snapshot) {
if (snapshot.getQuotaConfigurationsSize() > 0) {
- store.getQuotaStore().deleteQuotas();
- for (QuotaConfiguration quota : snapshot.getQuotaConfigurations()) {
- store.getQuotaStore()
- .saveQuota(quota.getRole(), IResourceAggregate.build(quota.getQuota()));
- }
+ return snapshot.getQuotaConfigurations().stream()
+ .map(quota -> Op.saveQuota(new SaveQuota()
+ .setRole(quota.getRole())
+ .setQuota(quota.getQuota())));
}
+ return Stream.empty();
}
},
new SnapshotField() {
@Override
- public String getName() {
+ String getName() {
return JOB_UPDATE_FIELD;
}
@Override
- public void saveToSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+ void saveToSnapshot(StoreProvider store, Snapshot snapshot) {
snapshot.setJobUpdateDetails(
store.getJobUpdateStore().fetchJobUpdates(JobUpdateStore.MATCH_ALL).stream()
.map(u -> new StoredJobUpdateDetails().setDetails(u.newBuilder()))
@@ -225,112 +224,101 @@ public class SnapshotStoreImpl implements SnapshotStore<Snapshot> {
}
@Override
- public void restoreFromSnapshot(MutableStoreProvider store, Snapshot snapshot) {
+ Stream<Op> doStreamFrom(Snapshot snapshot) {
if (snapshot.getJobUpdateDetailsSize() > 0) {
- JobUpdateStore.Mutable updateStore = store.getJobUpdateStore();
- updateStore.deleteAllUpdates();
- for (StoredJobUpdateDetails storedDetails : snapshot.getJobUpdateDetails()) {
- JobUpdateDetails details = storedDetails.getDetails();
- updateStore.saveJobUpdate(thriftBackfill.backFillJobUpdate(details.getUpdate()));
-
- if (details.getUpdateEventsSize() > 0) {
- for (JobUpdateEvent updateEvent : details.getUpdateEvents()) {
- updateStore.saveJobUpdateEvent(
- IJobUpdateKey.build(details.getUpdate().getSummary().getKey()),
- IJobUpdateEvent.build(updateEvent));
- }
- }
-
- if (details.getInstanceEventsSize() > 0) {
- for (JobInstanceUpdateEvent instanceEvent : details.getInstanceEvents()) {
- updateStore.saveJobInstanceUpdateEvent(
- IJobUpdateKey.build(details.getUpdate().getSummary().getKey()),
- IJobInstanceUpdateEvent.build(instanceEvent));
- }
- }
- }
+ return snapshot.getJobUpdateDetails().stream()
+ .flatMap(details -> {
+ Stream<Op> parent = Stream.of(Op.saveJobUpdate(
+ new SaveJobUpdate().setJobUpdate(details.getDetails().getUpdate())));
+ Stream<Op> jobEvents;
+ if (details.getDetails().getUpdateEventsSize() > 0) {
+ jobEvents = details.getDetails().getUpdateEvents().stream()
+ .map(event -> Op.saveJobUpdateEvent(
+ new SaveJobUpdateEvent()
+ .setKey(details.getDetails().getUpdate().getSummary().getKey())
+ .setEvent(event)));
+ } else {
+ jobEvents = Stream.empty();
+ }
+
+ Stream<Op> instanceEvents;
+ if (details.getDetails().getInstanceEventsSize() > 0) {
+ instanceEvents = details.getDetails().getInstanceEvents().stream()
+ .map(event -> Op.saveJobInstanceUpdateEvent(
+ new SaveJobInstanceUpdateEvent()
+ .setKey(details.getDetails().getUpdate().getSummary().getKey())
+ .setEvent(event)));
+ } else {
+ instanceEvents = Stream.empty();
+ }
+
+ return Streams.concat(parent, jobEvents, instanceEvents);
+ });
}
+ return Stream.empty();
}
}
);
private final BuildInfo buildInfo;
private final Clock clock;
- private final Storage storage;
- private final ThriftBackfill thriftBackfill;
@Inject
- public SnapshotStoreImpl(
- BuildInfo buildInfo,
- Clock clock,
- @Volatile Storage storage,
- ThriftBackfill thriftBackfill) {
-
+ public SnapshotStoreImpl(BuildInfo buildInfo, Clock clock) {
this.buildInfo = requireNonNull(buildInfo);
this.clock = requireNonNull(clock);
- this.storage = requireNonNull(storage);
- this.thriftBackfill = requireNonNull(thriftBackfill);
}
- private Snapshot createSnapshot(Storage anyStorage) {
- // It's important to perform snapshot creation in a write lock to ensure all upstream callers
- // are correctly synchronized (e.g. during backup creation).
- return anyStorage.write(storeProvider -> {
- Snapshot snapshot = new Snapshot();
-
- // Capture timestamp to signify the beginning of a snapshot operation, apply after in case
- // one of the field closures is mean and tries to apply a timestamp.
- long timestamp = clock.nowMillis();
- for (SnapshotField field : snapshotFields) {
- field.save(storeProvider, snapshot);
- }
+ private Snapshot createSnapshot(StoreProvider storeProvider) {
+ Snapshot snapshot = new Snapshot();
+
+ // Capture timestamp to signify the beginning of a snapshot operation, apply after in case
+ // one of the field closures is mean and tries to apply a timestamp.
+ long timestamp = clock.nowMillis();
+ for (SnapshotField field : snapshotFields) {
+ field.save(storeProvider, snapshot);
+ }
- SchedulerMetadata metadata = new SchedulerMetadata()
- .setFrameworkId(storeProvider.getSchedulerStore().fetchFrameworkId().orNull())
- .setDetails(buildInfo.getProperties());
+ SchedulerMetadata metadata = new SchedulerMetadata()
+ .setFrameworkId(storeProvider.getSchedulerStore().fetchFrameworkId().orNull())
+ .setDetails(buildInfo.getProperties());
- snapshot.setSchedulerMetadata(metadata);
- snapshot.setTimestamp(timestamp);
- return snapshot;
- });
+ snapshot.setSchedulerMetadata(metadata);
+ snapshot.setTimestamp(timestamp);
+ return snapshot;
}
@Timed("snapshot_create")
@Override
- public Snapshot createSnapshot() {
- return createSnapshot(storage);
+ public Snapshot from(StoreProvider stores) {
+ return createSnapshot(stores);
}
@Timed("snapshot_apply")
@Override
- public void applySnapshot(final Snapshot snapshot) {
+ public Stream<Op> asStream(Snapshot snapshot) {
requireNonNull(snapshot);
- storage.write((NoResult.Quiet) storeProvider -> {
- LOG.info("Restoring snapshot.");
-
- for (SnapshotField field : snapshotFields) {
- field.restore(storeProvider, snapshot);
- }
- });
+ LOG.info("Restoring snapshot.");
+ return snapshotFields.stream()
+ .flatMap(field -> field.streamFrom(snapshot));
}
abstract class SnapshotField {
abstract String getName();
- abstract void saveToSnapshot(MutableStoreProvider storeProvider, Snapshot snapshot);
+ abstract void saveToSnapshot(StoreProvider storeProvider, Snapshot snapshot);
- abstract void restoreFromSnapshot(MutableStoreProvider storeProvider, Snapshot snapshot);
+ abstract Stream<Op> doStreamFrom(Snapshot snapshot);
- void save(MutableStoreProvider storeProvider, Snapshot snapshot) {
+ void save(StoreProvider storeProvider, Snapshot snapshot) {
stats.getUnchecked(SNAPSHOT_SAVE + getName())
.time((Timeable.NoResult.Quiet) () -> saveToSnapshot(storeProvider, snapshot));
}
- void restore(MutableStoreProvider storeProvider, Snapshot snapshot) {
- stats.getUnchecked(SNAPSHOT_RESTORE + getName())
- .time((Timeable.NoResult.Quiet) () -> restoreFromSnapshot(storeProvider, snapshot));
+ Stream<Op> streamFrom(Snapshot snapshot) {
+ return stats.getUnchecked(SNAPSHOT_RESTORE + getName()).time(() -> doStreamFrom(snapshot));
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
index 159fb29..1b003ab 100644
--- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
+++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java
@@ -83,7 +83,7 @@ import org.apache.aurora.scheduler.state.MaintenanceController;
import org.apache.aurora.scheduler.state.StateChangeResult;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.state.UUIDGenerator;
-import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
@@ -170,7 +170,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
private final ConfigurationManager configurationManager;
private final Thresholds thresholds;
private final NonVolatileStorage storage;
- private final DistributedSnapshotStore snapshotStore;
+ private final SnapshotStore snapshotStore;
private final StorageBackup backup;
private final Recovery recovery;
private final MaintenanceController maintenance;
@@ -200,7 +200,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin {
ConfigurationManager configurationManager,
Thresholds thresholds,
NonVolatileStorage storage,
- DistributedSnapshotStore snapshotStore,
+ SnapshotStore snapshotStore,
StorageBackup backup,
Recovery recovery,
CronJobManager cronJobManager,
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
index aeb8685..020f348 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
@@ -35,7 +35,7 @@ import org.apache.aurora.scheduler.config.CommandLine;
import org.apache.aurora.scheduler.mesos.DriverFactory;
import org.apache.aurora.scheduler.mesos.DriverSettings;
import org.apache.aurora.scheduler.mesos.FrameworkInfoFactory;
-import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
import org.apache.mesos.SchedulerDriver;
@@ -83,7 +83,7 @@ public final class LocalSchedulerMain {
protected void configure() {
bind(Storage.class).to(Key.get(Storage.class, Storage.Volatile.class));
bind(NonVolatileStorage.class).to(FakeNonVolatileStorage.class);
- bind(DistributedSnapshotStore.class).toInstance(new DistributedSnapshotStore() {
+ bind(SnapshotStore.class).toInstance(new SnapshotStore() {
@Override
public void snapshot() throws Storage.StorageException {
// no-op
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
index 5cb5310..53a2315 100644
--- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
@@ -171,7 +171,6 @@ public class CommandLineTest {
expected.updater.enableAffinity = true;
expected.updater.affinityExpiration = TEST_TIME;
expected.state.taskAssignerModules = ImmutableList.of(NoopModule.class);
- expected.logStorage.shutdownGracePeriod = TEST_TIME;
expected.logStorage.snapshotInterval = TEST_TIME;
expected.logStorage.maxLogEntrySize = TEST_DATA;
expected.backup.backupInterval = TEST_TIME;
@@ -318,7 +317,6 @@ public class CommandLineTest {
"-enable_update_affinity=true",
"-update_affinity_reservation_hold_time=42days",
"-task_assigner_modules=org.apache.aurora.scheduler.config.CommandLineTest$NoopModule",
- "-dlog_shutdown_grace_period=42days",
"-dlog_snapshot_interval=42days",
"-dlog_max_entry_size=42GB",
"-backup_interval=42days",
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
index 09560f4..ba03ff9 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java
@@ -31,8 +31,8 @@ import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Snapshotter;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
@@ -49,6 +49,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
@@ -60,8 +61,8 @@ public class RecoveryTest extends EasyMockTest {
private static final IScheduledTask TASK2 = TaskTestUtil.makeTask("task2", TaskTestUtil.JOB);
private static final Snapshot SNAPSHOT1 = makeSnapshot(TASK1, TASK2);
- private SnapshotStore<Snapshot> snapshotStore;
- private DistributedSnapshotStore distributedStore;
+ private Snapshotter snapshotter;
+ private SnapshotStore distributedStore;
private Storage primaryStorage;
private MutableStoreProvider storeProvider;
private Command shutDownNow;
@@ -74,8 +75,8 @@ public class RecoveryTest extends EasyMockTest {
@Before
public void setUp() throws IOException {
final File backupDir = temporaryFolder.newFolder();
- snapshotStore = createMock(new Clazz<SnapshotStore<Snapshot>>() { });
- distributedStore = createMock(DistributedSnapshotStore.class);
+ snapshotter = createMock(Snapshotter.class);
+ distributedStore = createMock(SnapshotStore.class);
primaryStorage = createMock(Storage.class);
storeProvider = createMock(MutableStoreProvider.class);
shutDownNow = createMock(Command.class);
@@ -84,7 +85,8 @@ public class RecoveryTest extends EasyMockTest {
TemporaryStorageFactory factory =
new TemporaryStorageFactory(TaskTestUtil.THRIFT_BACKFILL);
storageBackup = new StorageBackupImpl(
- snapshotStore,
+ primaryStorage,
+ snapshotter,
clock,
new BackupConfig(backupDir, 5, INTERVAL),
executor);
@@ -94,7 +96,7 @@ public class RecoveryTest extends EasyMockTest {
@Test
public void testRecover() throws Exception {
- expect(snapshotStore.createSnapshot()).andReturn(SNAPSHOT1);
+ expect(snapshotter.from(anyObject())).andReturn(SNAPSHOT1);
Capture<MutateWork<Object, Exception>> transaction = createCapture();
expect(primaryStorage.write(capture(transaction))).andReturn(null);
Capture<Snapshot> snapshot = createCapture();
@@ -106,7 +108,7 @@ public class RecoveryTest extends EasyMockTest {
assertEquals(ImmutableSet.of(), recovery.listBackups());
clock.advance(INTERVAL);
- storageBackup.createSnapshot();
+ storageBackup.from(storeProvider);
String backup1 = storageBackup.createBackupName();
assertEquals(ImmutableSet.of(backup1), recovery.listBackups());
@@ -122,7 +124,7 @@ public class RecoveryTest extends EasyMockTest {
@Test
public void testModifySnapshotBeforeCommit() throws Exception {
- expect(snapshotStore.createSnapshot()).andReturn(SNAPSHOT1);
+ expect(snapshotter.from(anyObject())).andReturn(SNAPSHOT1);
Snapshot modified = SNAPSHOT1.deepCopy().setTasks(ImmutableSet.of(TASK1.newBuilder()));
Capture<MutateWork<Object, Exception>> transaction = createCapture();
expect(primaryStorage.write(capture(transaction))).andReturn(null);
@@ -133,7 +135,7 @@ public class RecoveryTest extends EasyMockTest {
control.replay();
clock.advance(INTERVAL);
- storageBackup.createSnapshot();
+ storageBackup.from(storeProvider);
String backup1 = storageBackup.createBackupName();
recovery.stage(backup1);
assertEquals(
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/backup/StorageBackupTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/backup/StorageBackupTest.java b/src/test/java/org/apache/aurora/scheduler/storage/backup/StorageBackupTest.java
index fff376f..f0ba33f 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/backup/StorageBackupTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/backup/StorageBackupTest.java
@@ -39,9 +39,12 @@ import org.apache.aurora.gen.storage.QuotaConfiguration;
import org.apache.aurora.gen.storage.SchedulerMetadata;
import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.gen.storage.StoredCronJob;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Snapshotter;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.backup.StorageBackup.StorageBackupImpl;
import org.apache.aurora.scheduler.storage.backup.StorageBackup.StorageBackupImpl.BackupConfig;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
import org.junit.Before;
import org.junit.Rule;
@@ -49,6 +52,7 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import static org.apache.aurora.scheduler.resources.ResourceTestUtil.aggregate;
+import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -58,7 +62,8 @@ public class StorageBackupTest extends EasyMockTest {
private static final int MAX_BACKUPS = 5;
private static final Amount<Long, Time> INTERVAL = Amount.of(1L, Time.HOURS);
- private SnapshotStore<Snapshot> delegate;
+ private Storage storage;
+ private Snapshotter delegate;
private FakeClock clock;
private BackupConfig config;
private StorageBackupImpl storageBackup;
@@ -67,29 +72,35 @@ public class StorageBackupTest extends EasyMockTest {
@Before
public void setUp() throws IOException {
- delegate = createMock(new Clazz<SnapshotStore<Snapshot>>() { });
+ storage = MemStorageModule.newEmptyStorage();
+ delegate = createMock(Snapshotter.class);
final File backupDir = temporaryFolder.newFolder();
ScheduledExecutorService executor = createMock(ScheduledExecutorService.class);
clock = FakeScheduledExecutor.scheduleExecutor(executor);
config = new BackupConfig(backupDir, MAX_BACKUPS, INTERVAL);
clock.advance(Amount.of(365 * 30L, Time.DAYS));
- storageBackup = new StorageBackupImpl(delegate, clock, config, executor);
+ storageBackup = new StorageBackupImpl(storage, delegate, clock, config, executor);
+ }
+
+ private void triggerSnapshot(Snapshot expectedResult) {
+ storage.write((NoResult.Quiet) stores ->
+ assertEquals(expectedResult, storageBackup.from(stores)));
}
@Test
public void testBackup() throws Exception {
Snapshot snapshot = makeSnapshot();
- expect(delegate.createSnapshot()).andReturn(snapshot).times(3);
+ expect(delegate.from(anyObject())).andReturn(snapshot).times(3);
control.replay();
- assertEquals(snapshot, storageBackup.createSnapshot());
+ triggerSnapshot(snapshot);
assertBackupCount(0);
clock.advance(Amount.of(INTERVAL.as(Time.MILLISECONDS) - 1, Time.MILLISECONDS));
- assertEquals(snapshot, storageBackup.createSnapshot());
+ triggerSnapshot(snapshot);
assertBackupCount(0);
clock.advance(Amount.of(1L, Time.MILLISECONDS));
- assertEquals(snapshot, storageBackup.createSnapshot());
+ triggerSnapshot(snapshot);
assertBackupCount(1);
assertEquals(1, storageBackup.getSuccesses().get());
@@ -104,34 +115,34 @@ public class StorageBackupTest extends EasyMockTest {
@Test
public void testDirectoryMissing() {
Snapshot snapshot = makeSnapshot();
- expect(delegate.createSnapshot()).andReturn(snapshot).times(1);
+ expect(delegate.from(anyObject())).andReturn(snapshot).times(1);
control.replay();
clock.advance(INTERVAL);
config.getDir().delete();
- assertEquals(snapshot, storageBackup.createSnapshot());
+ triggerSnapshot(snapshot);
assertEquals(1, storageBackup.getFailures().get());
}
@Test
public void testOldBackupsDeleted() {
Snapshot snapshot = makeSnapshot();
- expect(delegate.createSnapshot()).andReturn(snapshot).times(MAX_BACKUPS + 1);
+ expect(delegate.from(anyObject())).andReturn(snapshot).times(MAX_BACKUPS + 1);
control.replay();
ImmutableList.Builder<String> nameBuilder = ImmutableList.builder();
for (int i = 0; i < MAX_BACKUPS; i++) {
clock.advance(Amount.of(INTERVAL.as(Time.MILLISECONDS), Time.MILLISECONDS));
- assertEquals(snapshot, storageBackup.createSnapshot());
+ triggerSnapshot(snapshot);
nameBuilder.add(storageBackup.createBackupName());
assertBackupCount(i + 1);
assertEquals(i + 1, storageBackup.getSuccesses().get());
}
clock.advance(Amount.of(INTERVAL.as(Time.MILLISECONDS), Time.MILLISECONDS));
- assertEquals(snapshot, storageBackup.createSnapshot());
+ triggerSnapshot(snapshot);
nameBuilder.add(storageBackup.createBackupName());
assertBackupCount(MAX_BACKUPS);
assertEquals(MAX_BACKUPS + 1, storageBackup.getSuccesses().get());
@@ -150,17 +161,17 @@ public class StorageBackupTest extends EasyMockTest {
public void testInterval() {
// Ensures that a long initial interval does not result in shortened subsequent intervals.
Snapshot snapshot = makeSnapshot();
- expect(delegate.createSnapshot()).andReturn(snapshot).times(3);
+ expect(delegate.from(anyObject())).andReturn(snapshot).times(3);
control.replay();
- assertEquals(snapshot, storageBackup.createSnapshot());
+ triggerSnapshot(snapshot);
assertBackupCount(0);
clock.advance(Amount.of(INTERVAL.as(Time.MILLISECONDS) * 3, Time.MILLISECONDS));
- assertEquals(snapshot, storageBackup.createSnapshot());
+ triggerSnapshot(snapshot);
assertBackupCount(1);
assertEquals(1, storageBackup.getSuccesses().get());
- assertEquals(snapshot, storageBackup.createSnapshot());
+ triggerSnapshot(snapshot);
assertBackupCount(1);
assertEquals(1, storageBackup.getSuccesses().get());
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
index 3ad40ad..a6bf330 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/DurableStorageTest.java
@@ -13,7 +13,6 @@
*/
package org.apache.aurora.scheduler.storage.durability;
-import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
@@ -74,6 +73,7 @@ import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
+import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
@@ -167,44 +167,49 @@ public class DurableStorageTest extends EasyMockTest {
// Populate all Op types.
buildReplayOps();
+ storageUtil.expectStoreAccesses();
+
control.replay();
durableStorage.prepare();
durableStorage.start(initializationLogic);
assertTrue(initialized.get());
-
- // Assert all Transaction types have handlers defined.
- assertEquals(
- EnumSet.allOf(Op._Fields.class),
- EnumSet.copyOf(durableStorage.buildTransactionReplayActions().keySet()));
}
private void buildReplayOps() throws Exception {
- ImmutableSet.Builder<Op> builder = ImmutableSet.builder();
+ ImmutableSet.Builder<Edit> builder = ImmutableSet.builder();
- builder.add(Op.saveFrameworkId(new SaveFrameworkId("bob")));
+ builder.add(Edit.op(Op.saveFrameworkId(new SaveFrameworkId("bob"))));
storageUtil.schedulerStore.saveFrameworkId("bob");
JobConfiguration actualJob = new JobConfiguration().setTaskConfig(nonBackfilledConfig());
JobConfiguration expectedJob =
new JobConfiguration().setTaskConfig(makeConfig(JOB_KEY).newBuilder());
SaveCronJob cronJob = new SaveCronJob().setJobConfig(actualJob);
- builder.add(Op.saveCronJob(cronJob));
+ builder.add(Edit.op(Op.saveCronJob(cronJob)));
storageUtil.jobStore.saveAcceptedJob(IJobConfiguration.build(expectedJob));
RemoveJob removeJob = new RemoveJob(JOB_KEY.newBuilder());
- builder.add(Op.removeJob(removeJob));
+ builder.add(Edit.op(Op.removeJob(removeJob)));
storageUtil.jobStore.removeJob(JOB_KEY);
ScheduledTask actualTask = makeTask("id", JOB_KEY).newBuilder();
actualTask.getAssignedTask().setTask(nonBackfilledConfig());
IScheduledTask expectedTask = makeTask("id", JOB_KEY);
SaveTasks saveTasks = new SaveTasks(ImmutableSet.of(actualTask));
- builder.add(Op.saveTasks(saveTasks));
+ builder.add(Edit.op(Op.saveTasks(saveTasks)));
storageUtil.taskStore.saveTasks(ImmutableSet.of(expectedTask));
+ // Side-effects from a storage reset, caused by a snapshot.
+ builder.add(Edit.deleteAll());
+ storageUtil.jobStore.deleteJobs();
+ storageUtil.taskStore.deleteAllTasks();
+ storageUtil.quotaStore.deleteQuotas();
+ storageUtil.attributeStore.deleteHostAttributes();
+ storageUtil.jobUpdateStore.deleteAllUpdates();
+
RemoveTasks removeTasks = new RemoveTasks(ImmutableSet.of("taskId1"));
- builder.add(Op.removeTasks(removeTasks));
+ builder.add(Edit.op(Op.removeTasks(removeTasks)));
storageUtil.taskStore.deleteTasks(removeTasks.getTaskIds());
ResourceAggregate nonBackfilled = new ResourceAggregate()
@@ -212,33 +217,33 @@ public class DurableStorageTest extends EasyMockTest {
.setRamMb(32)
.setDiskMb(64);
SaveQuota saveQuota = new SaveQuota(JOB_KEY.getRole(), nonBackfilled);
- builder.add(Op.saveQuota(saveQuota));
+ builder.add(Edit.op(Op.saveQuota(saveQuota)));
storageUtil.quotaStore.saveQuota(
saveQuota.getRole(),
IResourceAggregate.build(nonBackfilled.deepCopy()
.setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))));
- builder.add(Op.removeQuota(new RemoveQuota(JOB_KEY.getRole())));
+ builder.add(Edit.op(Op.removeQuota(new RemoveQuota(JOB_KEY.getRole()))));
storageUtil.quotaStore.removeQuota(JOB_KEY.getRole());
// This entry lacks a slave ID, and should therefore be discarded.
SaveHostAttributes hostAttributes1 = new SaveHostAttributes(new HostAttributes()
.setHost("host1")
.setMode(MaintenanceMode.DRAINED));
- builder.add(Op.saveHostAttributes(hostAttributes1));
+ builder.add(Edit.op(Op.saveHostAttributes(hostAttributes1)));
SaveHostAttributes hostAttributes2 = new SaveHostAttributes(new HostAttributes()
.setHost("host2")
.setSlaveId("slave2")
.setMode(MaintenanceMode.DRAINED));
- builder.add(Op.saveHostAttributes(hostAttributes2));
+ builder.add(Edit.op(Op.saveHostAttributes(hostAttributes2)));
expect(storageUtil.attributeStore.saveHostAttributes(
IHostAttributes.build(hostAttributes2.getHostAttributes()))).andReturn(true);
- builder.add(Op.saveLock(new SaveLock()));
+ builder.add(Edit.op(Op.saveLock(new SaveLock())));
// TODO(jly): Deprecated, this is a no-op to be removed in 0.21. See AURORA-1959.
- builder.add(Op.removeLock(new RemoveLock()));
+ builder.add(Edit.op(Op.removeLock(new RemoveLock())));
// TODO(jly): Deprecated, this is a no-op to be removed in 0.21. See AURORA-1959.
JobUpdate actualUpdate = new JobUpdate()
@@ -252,12 +257,12 @@ public class DurableStorageTest extends EasyMockTest {
expectedUpdate.getInstructions().getInitialState()
.forEach(e -> e.setTask(makeConfig(JOB_KEY).newBuilder()));
SaveJobUpdate saveUpdate = new SaveJobUpdate().setJobUpdate(actualUpdate);
- builder.add(Op.saveJobUpdate(saveUpdate));
+ builder.add(Edit.op(Op.saveJobUpdate(saveUpdate)));
storageUtil.jobUpdateStore.saveJobUpdate(IJobUpdate.build(expectedUpdate));
SaveJobUpdateEvent saveUpdateEvent =
new SaveJobUpdateEvent(new JobUpdateEvent(), UPDATE_ID.newBuilder());
- builder.add(Op.saveJobUpdateEvent(saveUpdateEvent));
+ builder.add(Edit.op(Op.saveJobUpdateEvent(saveUpdateEvent)));
storageUtil.jobUpdateStore.saveJobUpdateEvent(
UPDATE_ID,
IJobUpdateEvent.build(saveUpdateEvent.getEvent()));
@@ -265,16 +270,16 @@ public class DurableStorageTest extends EasyMockTest {
SaveJobInstanceUpdateEvent saveInstanceEvent = new SaveJobInstanceUpdateEvent(
new JobInstanceUpdateEvent(),
UPDATE_ID.newBuilder());
- builder.add(Op.saveJobInstanceUpdateEvent(saveInstanceEvent));
+ builder.add(Edit.op(Op.saveJobInstanceUpdateEvent(saveInstanceEvent)));
storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(
UPDATE_ID,
IJobInstanceUpdateEvent.build(saveInstanceEvent.getEvent()));
- builder.add(Op.pruneJobUpdateHistory(new PruneJobUpdateHistory(5, 10L)));
+ builder.add(Edit.op(Op.pruneJobUpdateHistory(new PruneJobUpdateHistory(5, 10L))));
// No expectation - this op is ignored.
- builder.add(Op.removeJobUpdate(
- new RemoveJobUpdates().setKeys(ImmutableSet.of(UPDATE_ID.newBuilder()))));
+ builder.add(Edit.op(Op.removeJobUpdate(
+ new RemoveJobUpdates().setKeys(ImmutableSet.of(UPDATE_ID.newBuilder())))));
storageUtil.jobUpdateStore.removeJobUpdates(ImmutableSet.of(UPDATE_ID));
expect(persistence.recover()).andReturn(builder.build().stream());
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java
deleted file mode 100644
index e8b564b..0000000
--- a/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorageTest.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.durability;
-
-import java.util.Set;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.JobUpdateKey;
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.SaveHostAttributes;
-import org.apache.aurora.gen.storage.SaveTasks;
-import org.apache.aurora.scheduler.base.TaskTestUtil;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.CronJobStore;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.QuotaStore;
-import org.apache.aurora.scheduler.storage.SchedulerStore;
-import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.LoggerFactory;
-
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class WriteAheadStorageTest extends EasyMockTest {
-
- private TransactionManager transactionManager;
- private TaskStore.Mutable taskStore;
- private AttributeStore.Mutable attributeStore;
- private JobUpdateStore.Mutable jobUpdateStore;
- private EventSink eventSink;
- private WriteAheadStorage storage;
-
- @Before
- public void setUp() {
- transactionManager = createMock(TransactionManager.class);
- taskStore = createMock(TaskStore.Mutable.class);
- attributeStore = createMock(AttributeStore.Mutable.class);
- jobUpdateStore = createMock(JobUpdateStore.Mutable.class);
- eventSink = createMock(EventSink.class);
-
- storage = new WriteAheadStorage(
- transactionManager,
- createMock(SchedulerStore.Mutable.class),
- createMock(CronJobStore.Mutable.class),
- taskStore,
- createMock(QuotaStore.Mutable.class),
- attributeStore,
- jobUpdateStore,
- LoggerFactory.getLogger(WriteAheadStorageTest.class),
- eventSink);
- }
-
- private void expectOp(Op op) {
- expect(transactionManager.hasActiveTransaction()).andReturn(true);
- transactionManager.log(op);
- }
-
- @Test
- public void testRemoveUpdates() {
- Set<IJobUpdateKey> removed = ImmutableSet.of(
- IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "a")),
- IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "b")));
- jobUpdateStore.removeJobUpdates(removed);
- // No operation is written since this Op is in read-only compatibility mode.
-
- control.replay();
-
- storage.removeJobUpdates(removed);
- }
-
- @Test
- public void testMutate() {
- String taskId = "a";
- Function<IScheduledTask, IScheduledTask> mutator =
- createMock(new Clazz<Function<IScheduledTask, IScheduledTask>>() { });
- Optional<IScheduledTask> mutated = Optional.of(TaskTestUtil.makeTask(taskId, TaskTestUtil.JOB));
-
- expect(taskStore.mutateTask(taskId, mutator)).andReturn(mutated);
- expectOp(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
-
- control.replay();
-
- assertEquals(mutated, storage.mutateTask(taskId, mutator));
- }
-
- @Test
- public void testSaveHostAttributes() {
- IHostAttributes attributes = IHostAttributes.build(
- new HostAttributes()
- .setHost("a")
- .setMode(MaintenanceMode.DRAINING)
- .setAttributes(ImmutableSet.of(
- new Attribute().setName("b").setValues(ImmutableSet.of("1", "2")))));
-
- expect(attributeStore.saveHostAttributes(attributes)).andReturn(true);
- expectOp(Op.saveHostAttributes(
- new SaveHostAttributes().setHostAttributes(attributes.newBuilder())));
- eventSink.post(new PubsubEvent.HostAttributesChanged(attributes));
-
- expect(attributeStore.saveHostAttributes(attributes)).andReturn(false);
-
- control.replay();
-
- assertTrue(storage.saveHostAttributes(attributes));
-
- assertFalse(storage.saveHostAttributes(attributes));
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testDeleteAllTasks() {
- control.replay();
- storage.deleteAllTasks();
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testDeleteHostAttributes() {
- control.replay();
- storage.deleteHostAttributes();
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testDeleteJobs() {
- control.replay();
- storage.deleteJobs();
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testDeleteQuotas() {
- control.replay();
- storage.deleteQuotas();
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testDeleteAllUpdatesAndEvents() {
- control.replay();
- storage.deleteAllUpdates();
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteRecorderTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteRecorderTest.java b/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteRecorderTest.java
new file mode 100644
index 0000000..1a89e83
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/durability/WriteRecorderTest.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.JobUpdateKey;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class WriteRecorderTest extends EasyMockTest {
+
+ private TransactionManager transactionManager;
+ private TaskStore.Mutable taskStore;
+ private AttributeStore.Mutable attributeStore;
+ private JobUpdateStore.Mutable jobUpdateStore;
+ private EventSink eventSink;
+ private WriteRecorder storage;
+
+ @Before
+ public void setUp() {
+ transactionManager = createMock(TransactionManager.class);
+ taskStore = createMock(TaskStore.Mutable.class);
+ attributeStore = createMock(AttributeStore.Mutable.class);
+ jobUpdateStore = createMock(JobUpdateStore.Mutable.class);
+ eventSink = createMock(EventSink.class);
+
+ storage = new WriteRecorder(
+ transactionManager,
+ createMock(SchedulerStore.Mutable.class),
+ createMock(CronJobStore.Mutable.class),
+ taskStore,
+ createMock(QuotaStore.Mutable.class),
+ attributeStore,
+ jobUpdateStore,
+ LoggerFactory.getLogger(WriteRecorderTest.class),
+ eventSink);
+ }
+
+ private void expectOp(Op op) {
+ expect(transactionManager.hasActiveTransaction()).andReturn(true);
+ transactionManager.log(op);
+ }
+
+ @Test
+ public void testRemoveUpdates() {
+ Set<IJobUpdateKey> removed = ImmutableSet.of(
+ IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "a")),
+ IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), "b")));
+ jobUpdateStore.removeJobUpdates(removed);
+ // No operation is written since this Op is in read-only compatibility mode.
+
+ control.replay();
+
+ storage.removeJobUpdates(removed);
+ }
+
+ @Test
+ public void testMutate() {
+ String taskId = "a";
+ Function<IScheduledTask, IScheduledTask> mutator =
+ createMock(new Clazz<Function<IScheduledTask, IScheduledTask>>() { });
+ Optional<IScheduledTask> mutated = Optional.of(TaskTestUtil.makeTask(taskId, TaskTestUtil.JOB));
+
+ expect(taskStore.mutateTask(taskId, mutator)).andReturn(mutated);
+ expectOp(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
+
+ control.replay();
+
+ assertEquals(mutated, storage.mutateTask(taskId, mutator));
+ }
+
+ @Test
+ public void testSaveHostAttributes() {
+ IHostAttributes attributes = IHostAttributes.build(
+ new HostAttributes()
+ .setHost("a")
+ .setMode(MaintenanceMode.DRAINING)
+ .setAttributes(ImmutableSet.of(
+ new Attribute().setName("b").setValues(ImmutableSet.of("1", "2")))));
+
+ expect(attributeStore.saveHostAttributes(attributes)).andReturn(true);
+ expectOp(Op.saveHostAttributes(
+ new SaveHostAttributes().setHostAttributes(attributes.newBuilder())));
+ eventSink.post(new PubsubEvent.HostAttributesChanged(attributes));
+
+ expect(attributeStore.saveHostAttributes(attributes)).andReturn(false);
+
+ control.replay();
+
+ assertTrue(storage.saveHostAttributes(attributes));
+
+ assertFalse(storage.saveHostAttributes(attributes));
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeleteAllTasks() {
+ control.replay();
+ storage.deleteAllTasks();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeleteHostAttributes() {
+ control.replay();
+ storage.deleteHostAttributes();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeleteJobs() {
+ control.replay();
+ storage.deleteJobs();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeleteQuotas() {
+ control.replay();
+ storage.deleteQuotas();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDeleteAllUpdatesAndEvents() {
+ control.replay();
+ storage.deleteAllUpdates();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java
new file mode 100644
index 0000000..3d6d555
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogPersistenceTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+import org.apache.aurora.codec.ThriftBinaryCodec;
+import org.apache.aurora.common.inject.Bindings;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.util.BuildInfo;
+import org.apache.aurora.common.util.Clock;
+import org.apache.aurora.common.util.testing.FakeBuildInfo;
+import org.apache.aurora.common.util.testing.FakeClock;
+import org.apache.aurora.gen.storage.LogEntry;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.gen.storage.Transaction;
+import org.apache.aurora.scheduler.TierModule;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.log.Log;
+import org.apache.aurora.scheduler.log.Log.Entry;
+import org.apache.aurora.scheduler.log.Log.Stream;
+import org.apache.aurora.scheduler.storage.Snapshotter;
+import org.apache.aurora.scheduler.storage.Storage.Volatile;
+import org.apache.aurora.scheduler.storage.durability.Persistence;
+import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
+import org.apache.aurora.scheduler.storage.log.LogStorageModule.Options;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class LogPersistenceTest extends EasyMockTest {
+
+ private Persistence persistence;
+
+ private Log mockLog;
+ private Stream mockStream;
+
+ @Before
+ public void setUp() {
+ mockLog = createMock(Log.class);
+ mockStream = createMock(Stream.class);
+
+ Injector injector = Guice.createInjector(
+ new LogStorageModule(new Options()),
+ new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)),
+ new TierModule(TaskTestUtil.TIER_CONFIG),
+ new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(StatsProvider.class).toInstance(new FakeStatsProvider());
+ bind(EventSink.class).toInstance(e -> { });
+ bind(BuildInfo.class).toInstance(FakeBuildInfo.generateBuildInfo());
+ bind(Clock.class).toInstance(new FakeClock());
+ bind(Snapshotter.class).to(SnapshotStoreImpl.class);
+ bind(Log.class).toInstance(mockLog);
+ }
+ }
+ );
+
+ persistence = injector.getInstance(Persistence.class);
+ }
+
+ @Test
+ public void testRecoverEmpty() throws Exception {
+ expect(mockLog.open()).andReturn(mockStream);
+ List<Entry> empty = ImmutableList.of();
+ expect(mockStream.readAll()).andReturn(empty.iterator());
+
+ control.replay();
+
+ persistence.prepare();
+ assertEquals(ImmutableList.of(), persistence.recover().collect(Collectors.toList()));
+ }
+
+ @Test
+ public void testRecoverSnapshot() throws Exception {
+ expect(mockLog.open()).andReturn(mockStream);
+
+ Op saveA = Op.saveTasks(new SaveTasks().setTasks(ImmutableSet.of(
+ TaskTestUtil.makeTask("a", TaskTestUtil.JOB).newBuilder())));
+ Op saveB = Op.saveTasks(new SaveTasks().setTasks(ImmutableSet.of(
+ TaskTestUtil.makeTask("b", TaskTestUtil.JOB).newBuilder())));
+ Op saveC = Op.saveTasks(new SaveTasks().setTasks(ImmutableSet.of(
+ TaskTestUtil.makeTask("c", TaskTestUtil.JOB).newBuilder())));
+
+ List<Entry> entries = ImmutableList.of(
+ logEntry(LogEntry.transaction(new Transaction().setOps(ImmutableList.of(saveA)))),
+ logEntry(LogEntry.snapshot(new Snapshot().setTasks(saveB.getSaveTasks().getTasks()))),
+ logEntry(LogEntry.transaction(new Transaction().setOps(ImmutableList.of(saveC)))));
+
+ expect(mockStream.readAll()).andReturn(entries.iterator());
+
+ control.replay();
+
+ persistence.prepare();
+ assertEquals(
+ ImmutableList.of(
+ Edit.op(saveA),
+ Edit.deleteAll(),
+ Edit.op(saveB),
+ Edit.op(saveC)),
+ persistence.recover().collect(Collectors.toList()));
+ }
+
+ private static Entry logEntry(LogEntry entry) {
+ return () -> ThriftBinaryCodec.encodeNonNull(entry);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
index eb966d7..fdde73d 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
@@ -21,7 +21,6 @@ import com.google.common.collect.Lists;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
-import com.google.inject.TypeLiteral;
import org.apache.aurora.common.application.ShutdownRegistry;
import org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImpl;
@@ -35,15 +34,14 @@ import org.apache.aurora.common.util.BuildInfo;
import org.apache.aurora.common.util.Clock;
import org.apache.aurora.common.util.testing.FakeBuildInfo;
import org.apache.aurora.common.util.testing.FakeClock;
-import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.scheduler.TierModule;
import org.apache.aurora.scheduler.config.types.DataAmount;
import org.apache.aurora.scheduler.config.types.TimeAmount;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.log.Log;
import org.apache.aurora.scheduler.resources.ResourceTestUtil;
-import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Snapshotter;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
@@ -62,7 +60,7 @@ public class NonVolatileStorageTest extends TearDownTestCase {
private FakeLog log;
private Runnable teardown = () -> { };
private NonVolatileStorage storage;
- private DistributedSnapshotStore snapshotStore;
+ private SnapshotStore snapshotStore;
@Before
public void setUp() {
@@ -92,12 +90,12 @@ public class NonVolatileStorageTest extends TearDownTestCase {
bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
bind(StatsProvider.class).toInstance(new FakeStatsProvider());
bind(Log.class).toInstance(log);
- bind(new TypeLiteral<SnapshotStore<Snapshot>>() { }).to(SnapshotStoreImpl.class);
+ bind(Snapshotter.class).to(SnapshotStoreImpl.class);
}
}
);
storage = injector.getInstance(NonVolatileStorage.class);
- snapshotStore = injector.getInstance(DistributedSnapshotStore.class);
+ snapshotStore = injector.getInstance(SnapshotStore.class);
storage.prepare();
storage.start(w -> { });
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java
new file mode 100644
index 0000000..270453d
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotServiceTest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+
+import org.apache.aurora.GuavaUtils.ServiceManagerIface;
+import org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImpl;
+import org.apache.aurora.common.application.ShutdownStage;
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.inject.Bindings;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.scheduler.SchedulerLifecycle.SchedulerActive;
+import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.TierModule;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.config.types.TimeAmount;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.log.Log;
+import org.apache.aurora.scheduler.log.Log.Entry;
+import org.apache.aurora.scheduler.log.Log.Position;
+import org.apache.aurora.scheduler.log.Log.Stream;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Snapshotter;
+import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
+import org.apache.aurora.scheduler.storage.Storage.Volatile;
+import org.apache.aurora.scheduler.storage.log.LogStorageModule.Options;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.easymock.IAnswer;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+
+public class SnapshotServiceTest extends EasyMockTest {
+
+ private static final Snapshot SNAPSHOT = new Snapshot().setTasks(
+ ImmutableSet.of(TaskTestUtil.makeTask("a", TaskTestUtil.JOB).newBuilder()));
+
+ private NonVolatileStorage storage;
+ private SnapshotStore snapshotStore;
+ private ServiceManagerIface serviceManager;
+
+ private Snapshotter mockSnapshotter;
+ private Log mockLog;
+ private Stream mockStream;
+ private Position mockPosition;
+
+ private void setUp(Amount<Long, Time> snapshotInterval) {
+ mockSnapshotter = createMock(Snapshotter.class);
+ mockLog = createMock(Log.class);
+ mockStream = createMock(Stream.class);
+ mockPosition = createMock(Position.class);
+
+ Options options = new Options();
+ options.snapshotInterval =
+ new TimeAmount(snapshotInterval.getValue(), snapshotInterval.getUnit());
+
+ Injector injector = Guice.createInjector(
+ new SchedulerServicesModule(),
+ new LogStorageModule(options),
+ new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)),
+ new TierModule(TaskTestUtil.TIER_CONFIG),
+ new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(Key.get(Command.class, ShutdownStage.class)).to(ShutdownRegistryImpl.class);
+ bind(StatsProvider.class).toInstance(new FakeStatsProvider());
+ bind(EventSink.class).toInstance(e -> { });
+ bind(Snapshotter.class).toInstance(mockSnapshotter);
+ bind(Log.class).toInstance(mockLog);
+ }
+ }
+ );
+
+ storage = injector.getInstance(NonVolatileStorage.class);
+ snapshotStore = injector.getInstance(SnapshotStore.class);
+ serviceManager =
+ injector.getInstance(Key.get(ServiceManagerIface.class, SchedulerActive.class));
+ }
+
+ private void expectStorageInitialized() throws Exception {
+ expect(mockLog.open()).andReturn(mockStream);
+ List<Entry> empty = ImmutableList.of();
+ expect(mockStream.readAll()).andReturn(empty.iterator());
+ }
+
+ private void expectSnapshotPersist(CountDownLatch latch) {
+ expect(mockStream.append(anyObject())).andReturn(mockPosition).atLeastOnce();
+ mockStream.truncateBefore(mockPosition);
+ expectLastCall().andAnswer((IAnswer<Void>) () -> {
+ latch.countDown();
+ return null;
+ }).atLeastOnce();
+ }
+
+ @Test
+ public void testPeriodicSnapshots() throws Exception {
+ setUp(Amount.of(1L, Time.MILLISECONDS));
+
+ expectStorageInitialized();
+
+ expect(mockSnapshotter.from(anyObject())).andReturn(SNAPSHOT).atLeastOnce();
+
+ CountDownLatch snapshotCalled = new CountDownLatch(2);
+ expectSnapshotPersist(snapshotCalled);
+
+ control.replay();
+
+ storage.prepare();
+ storage.start(stores -> { });
+ serviceManager.startAsync().awaitHealthy();
+
+ snapshotCalled.await();
+
+ serviceManager.stopAsync().awaitStopped(10, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testExplicitInternalSnapshot() throws Exception {
+ setUp(Amount.of(1L, Time.HOURS));
+
+ expectStorageInitialized();
+
+ expect(mockSnapshotter.from(anyObject())).andReturn(SNAPSHOT);
+ expectSnapshotPersist(new CountDownLatch(1));
+
+ control.replay();
+
+ storage.prepare();
+ storage.start(stores -> { });
+ snapshotStore.snapshot();
+ }
+
+ @Test
+ public void testExplicitProvidedSnapshot() throws Exception {
+ setUp(Amount.of(1L, Time.HOURS));
+
+ expectStorageInitialized();
+ expectSnapshotPersist(new CountDownLatch(1));
+
+ control.replay();
+
+ storage.prepare();
+ storage.start(stores -> { });
+ snapshotStore.snapshotWith(SNAPSHOT);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
index 5634f92..2ad4e84 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
@@ -18,12 +18,8 @@ import java.util.Map;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
import org.apache.aurora.common.stats.Stats;
-import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.util.testing.FakeBuildInfo;
import org.apache.aurora.common.util.testing.FakeClock;
import org.apache.aurora.gen.Attribute;
@@ -55,6 +51,9 @@ import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.durability.Loader;
+import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
@@ -65,10 +64,10 @@ import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.junit.Test;
import static org.apache.aurora.common.util.testing.FakeBuildInfo.generateBuildInfo;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.THRIFT_BACKFILL;
import static org.apache.aurora.scheduler.resources.ResourceManager.aggregateFromBag;
import static org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl.SNAPSHOT_RESTORE;
import static org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl.SNAPSHOT_SAVE;
@@ -80,35 +79,27 @@ public class SnapshotStoreImplIT {
private static final long NOW = 10335463456L;
private static final IJobKey JOB_KEY = JobKeys.from("role", "env", "job");
- private SnapshotStoreImpl snapshotStore;
+ private Storage storage;
+ private SnapshotStoreImpl snapshotter;
private void setUpStore() {
- Injector injector = Guice.createInjector(
- new MemStorageModule(),
- new AbstractModule() {
- @Override
- protected void configure() {
- bind(StatsProvider.class).toInstance(new FakeStatsProvider());
- }
- });
-
+ storage = MemStorageModule.newEmptyStorage();
FakeClock clock = new FakeClock();
clock.setNowMillis(NOW);
- snapshotStore = new SnapshotStoreImpl(
- generateBuildInfo(),
- clock,
- injector.getInstance(Storage.class),
- TaskTestUtil.THRIFT_BACKFILL);
+ snapshotter = new SnapshotStoreImpl(generateBuildInfo(), clock);
Stats.flush();
}
@Test
public void testBackfill() {
setUpStore();
- snapshotStore.applySnapshot(makeNonBackfilled());
+ storage.write((NoResult.Quiet) stores ->
+ Loader.load(
+ stores,
+ THRIFT_BACKFILL,
+ snapshotter.asStream(makeNonBackfilled()).map(Edit::op)));
- Snapshot backfilled = snapshotStore.createSnapshot();
- assertEquals(expected(), backfilled);
+ assertEquals(expected(), storage.write(snapshotter::from));
assertSnapshotRestoreStats(1L);
assertSnapshotSaveStats(1L);
}
@@ -183,14 +174,14 @@ public class SnapshotStoreImplIT {
}
private void assertSnapshotSaveStats(long count) {
- for (String stat : snapshotStore.snapshotFieldNames()) {
+ for (String stat : snapshotter.snapshotFieldNames()) {
assertEquals(count, Stats.getVariable(SNAPSHOT_SAVE + stat + "_events").read());
assertNotNull(Stats.getVariable(SNAPSHOT_SAVE + stat + "_nanos_total"));
}
}
private void assertSnapshotRestoreStats(long count) {
- for (String stat : snapshotStore.snapshotFieldNames()) {
+ for (String stat : snapshotter.snapshotFieldNames()) {
assertEquals(count, Stats.getVariable(SNAPSHOT_RESTORE + stat + "_events").read());
assertNotNull(Stats.getVariable(SNAPSHOT_RESTORE + stat + "_nanos_total"));
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java b/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
index fd81bff..64fbb54 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/testing/StorageTestUtil.java
@@ -83,9 +83,9 @@ public class StorageTestUtil {
}
/**
- * Expects any number of read or write operations.
+ * Expects any number of calls to fetch individual stores.
*/
- public void expectOperations() {
+ public void expectStoreAccesses() {
expect(storeProvider.getTaskStore()).andReturn(taskStore).anyTimes();
expect(storeProvider.getQuotaStore()).andReturn(quotaStore).anyTimes();
expect(storeProvider.getAttributeStore()).andReturn(attributeStore).anyTimes();
@@ -99,6 +99,13 @@ public class StorageTestUtil {
expect(mutableStoreProvider.getCronJobStore()).andReturn(jobStore).anyTimes();
expect(mutableStoreProvider.getSchedulerStore()).andReturn(schedulerStore).anyTimes();
expect(mutableStoreProvider.getJobUpdateStore()).andReturn(jobUpdateStore).anyTimes();
+ }
+
+ /**
+ * Expects any number of read or write operations.
+ */
+ public void expectOperations() {
+ expectStoreAccesses();
expectRead().anyTimes();
expectWrite().anyTimes();
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 919ac14..040baf9 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -84,7 +84,7 @@ import org.apache.aurora.scheduler.state.MaintenanceController;
import org.apache.aurora.scheduler.state.StateChangeResult;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.state.UUIDGenerator;
-import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
import org.apache.aurora.scheduler.storage.Storage.StorageException;
import org.apache.aurora.scheduler.storage.backup.Recovery;
import org.apache.aurora.scheduler.storage.backup.StorageBackup;
@@ -178,7 +178,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
ImmutableSet.of(new Metadata("k1", "v1"), new Metadata("k2", "v2"));
private StorageTestUtil storageUtil;
- private DistributedSnapshotStore snapshotStore;
+ private SnapshotStore snapshotStore;
private StorageBackup backup;
private Recovery recovery;
private MaintenanceController maintenance;
@@ -197,7 +197,7 @@ public class SchedulerThriftInterfaceTest extends EasyMockTest {
public void setUp() throws Exception {
storageUtil = new StorageTestUtil(this);
storageUtil.expectOperations();
- snapshotStore = createMock(DistributedSnapshotStore.class);
+ snapshotStore = createMock(SnapshotStore.class);
backup = createMock(StorageBackup.class);
recovery = createMock(Recovery.class);
maintenance = createMock(MaintenanceController.class);
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
index bb0fd89..231fd8d 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
@@ -62,7 +62,7 @@ import org.apache.aurora.scheduler.quota.QuotaModule;
import org.apache.aurora.scheduler.resources.ResourceTestUtil;
import org.apache.aurora.scheduler.resources.ResourceType;
import org.apache.aurora.scheduler.stats.StatsModule;
-import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
import org.apache.aurora.scheduler.storage.backup.Recovery;
@@ -136,7 +136,7 @@ public class ThriftIT extends EasyMockTest {
bind(FrameworkInfoFactoryImpl.class).in(Singleton.class);
bindMock(Recovery.class);
bindMock(StorageBackup.class);
- bindMock(DistributedSnapshotStore.class);
+ bindMock(SnapshotStore.class);
bind(IServerInfo.class).toInstance(SERVER_INFO);
}
[2/2] aurora git commit: Recover snapshots via the Op stream
Posted by wf...@apache.org.
Recover snapshots via the Op stream
This cleans up the various interfaces around persisting and recovering from
`Snapshot`s. Most importantly, `LogPersistence` no longer bypasses the
`recover()` `Op` stream to apply snapshots. As a result, it should be
straightforward to build a migration utility that clones `LogPersistence`
state into another `Persistence` implementation.
Reviewed at https://reviews.apache.org/r/64286/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/5f79f7ca
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/5f79f7ca
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/5f79f7ca
Branch: refs/heads/master
Commit: 5f79f7ca7c62f053f66a9ea925cebb78a644ce54
Parents: 4489dc3
Author: Bill Farner <wf...@apache.org>
Authored: Wed Dec 13 20:37:57 2017 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Dec 13 20:37:57 2017 -0800
----------------------------------------------------------------------
.../aurora/benchmark/SnapshotBenchmarks.java | 4 +-
.../storage/DistributedSnapshotStore.java | 39 --
.../aurora/scheduler/storage/SnapshotStore.java | 25 +-
.../aurora/scheduler/storage/Snapshotter.java | 43 +++
.../scheduler/storage/backup/BackupModule.java | 14 +-
.../scheduler/storage/backup/Recovery.java | 10 +-
.../scheduler/storage/backup/StorageBackup.java | 27 +-
.../storage/backup/TemporaryStorage.java | 21 +-
.../storage/durability/DurableStorage.java | 155 +-------
.../scheduler/storage/durability/Loader.java | 150 ++++++++
.../storage/durability/Persistence.java | 56 ++-
.../storage/durability/WriteAheadStorage.java | 368 -------------------
.../storage/durability/WriteRecorder.java | 368 +++++++++++++++++++
.../scheduler/storage/log/LogPersistence.java | 206 ++---------
.../scheduler/storage/log/LogStorageModule.java | 86 ++---
.../scheduler/storage/log/SnapshotService.java | 121 ++++++
.../storage/log/SnapshotStoreImpl.java | 236 ++++++------
.../thrift/SchedulerThriftInterface.java | 6 +-
.../scheduler/app/local/LocalSchedulerMain.java | 4 +-
.../scheduler/config/CommandLineTest.java | 2 -
.../scheduler/storage/backup/RecoveryTest.java | 22 +-
.../storage/backup/StorageBackupTest.java | 45 ++-
.../storage/durability/DurableStorageTest.java | 53 +--
.../durability/WriteAheadStorageTest.java | 166 ---------
.../storage/durability/WriteRecorderTest.java | 166 +++++++++
.../storage/log/LogPersistenceTest.java | 134 +++++++
.../storage/log/NonVolatileStorageTest.java | 10 +-
.../storage/log/SnapshotServiceTest.java | 174 +++++++++
.../storage/log/SnapshotStoreImplIT.java | 41 +--
.../storage/testing/StorageTestUtil.java | 11 +-
.../thrift/SchedulerThriftInterfaceTest.java | 6 +-
.../aurora/scheduler/thrift/ThriftIT.java | 4 +-
32 files changed, 1578 insertions(+), 1195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
index 755582d..4f99f80 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SnapshotBenchmarks.java
@@ -75,7 +75,7 @@ public class SnapshotBenchmarks {
@Benchmark
public boolean run() throws TException {
- snapshotStore.applySnapshot(snapshot);
+ snapshotStore.asStream(snapshot);
// Return non-guessable result to satisfy "blackhole" requirement.
return System.currentTimeMillis() % 5 == 0;
}
@@ -103,7 +103,7 @@ public class SnapshotBenchmarks {
.setNumInstanceEvents(instanceEvents)
.build(updates));
- return snapshotStore.createSnapshot();
+ return storage.write(snapshotStore::from);
}
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java b/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
deleted file mode 100644
index 0c6a955..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage;
-
-import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
-import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
-
-/**
- * A distributed snapshot store that supports persisting globally-visible snapshots.
- */
-public interface DistributedSnapshotStore {
-
- /**
- * Clean up the underlying storage by optimizing internal data structures. Does not change
- * externally-visible state but might not run concurrently with write operations.
- */
- void snapshot() throws StorageException;
-
- /**
- * Identical to {@link #snapshot()}, using a custom {@link Snapshot} rather than an
- * internally-generated one based on the current state.
- *
- * @param snapshot Snapshot to write.
- * @throws CodingException If the snapshot could not be serialized.
- */
- void snapshotWith(Snapshot snapshot) throws CodingException;
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/SnapshotStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/SnapshotStore.java b/src/main/java/org/apache/aurora/scheduler/storage/SnapshotStore.java
index 6b5e5dd..ab109ab 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/SnapshotStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/SnapshotStore.java
@@ -13,24 +13,27 @@
*/
package org.apache.aurora.scheduler.storage;
+import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
+
/**
- * Storage mechanism that is able to create complete snapshots of the local storage system state
- * and apply these to restore local storage from a snapshotted baseline.
+ * A storage component that applies full-state snapshots.
*/
-public interface SnapshotStore<T> {
+public interface SnapshotStore {
/**
- * Creates a consistent snapshot of the local storage system.
- *
- * @return A blob that can be used to recover local storage via {@link #applySnapshot(Object)}.
+ * Clean up the underlying storage by optimizing internal data structures. Does not change
+ * externally-visible state but might not run concurrently with write operations.
*/
- T createSnapshot();
+ void snapshot() throws StorageException;
/**
- * Applies a snapshot blob to the local storage system, wiping out all existing data and
- * resetting with the contents of the snapshot.
+ * Identical to {@link #snapshot()}, using a custom {@link Snapshot} rather than an
+ * internally-generated one based on the current state.
*
- * @param snapshot A snapshot blob created by {@link #createSnapshot()}.
+ * @param snapshot Snapshot to write.
+ * @throws CodingException If the snapshot could not be serialized.
*/
- void applySnapshot(T snapshot);
+ void snapshotWith(Snapshot snapshot) throws CodingException;
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/Snapshotter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Snapshotter.java b/src/main/java/org/apache/aurora/scheduler/storage/Snapshotter.java
new file mode 100644
index 0000000..0966faf
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Snapshotter.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage;
+
+import java.util.stream.Stream;
+
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+
+/**
+ * Logic to convert storage contents into a snapshot, and a snapshot into a stream of storage
+ * operations.
+ */
+public interface Snapshotter {
+
+ /**
+ * Creates a snapshot from the contents of storage.
+ *
+ * @param stores stores to create a snapshot from.
+ * @return A snapshot that can be used to recover storage.
+ */
+ Snapshot from(StoreProvider stores);
+
+ /**
+ * Converts a snapshot into an equivalent linear stream of storage operations.
+ *
+ * @param snapshot A snapshot created by {@link #from(StoreProvider)}.
+ * @return a stream of operations representing the contents of the snapshot.
+ */
+ Stream<Op> asStream(Snapshot snapshot);
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java
index 7eaae89..4397c1e 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java
@@ -32,7 +32,7 @@ import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.scheduler.base.AsyncUtil;
import org.apache.aurora.scheduler.config.types.TimeAmount;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Snapshotter;
import org.apache.aurora.scheduler.storage.backup.Recovery.RecoveryImpl;
import org.apache.aurora.scheduler.storage.backup.StorageBackup.StorageBackupImpl;
import org.apache.aurora.scheduler.storage.backup.StorageBackup.StorageBackupImpl.BackupConfig;
@@ -66,9 +66,9 @@ public class BackupModule extends PrivateModule {
}
private final Options options;
- private final Class<? extends SnapshotStore<Snapshot>> snapshotStore;
+ private final Class<? extends Snapshotter> snapshotStore;
- public BackupModule(Options options, Class<? extends SnapshotStore<Snapshot>> snapshotStore) {
+ public BackupModule(Options options, Class<? extends Snapshotter> snapshotStore) {
this.options = options;
this.snapshotStore = snapshotStore;
}
@@ -78,13 +78,13 @@ public class BackupModule extends PrivateModule {
Executor executor = AsyncUtil.singleThreadLoggingScheduledExecutor("StorageBackup-%d", LOG);
bind(Executor.class).toInstance(executor);
- TypeLiteral<SnapshotStore<Snapshot>> type = new TypeLiteral<SnapshotStore<Snapshot>>() { };
- bind(type).annotatedWith(StorageBackupImpl.SnapshotDelegate.class).to(snapshotStore);
+ bind(Snapshotter.class).annotatedWith(StorageBackupImpl.SnapshotDelegate.class)
+ .to(snapshotStore);
- bind(type).to(StorageBackupImpl.class);
+ bind(Snapshotter.class).to(StorageBackupImpl.class);
bind(StorageBackup.class).to(StorageBackupImpl.class);
bind(StorageBackupImpl.class).in(Singleton.class);
- expose(type);
+ expose(Snapshotter.class);
expose(StorageBackup.class);
bind(new TypeLiteral<Function<Snapshot, TemporaryStorage>>() { })
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
index 3a62f02..79899a0 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
@@ -31,7 +31,7 @@ import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
import org.apache.aurora.common.base.Command;
import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -110,7 +110,7 @@ public interface Recovery {
private final Function<Snapshot, TemporaryStorage> tempStorageFactory;
private final AtomicReference<PendingRecovery> recovery;
private final Storage primaryStorage;
- private final DistributedSnapshotStore distributedStore;
+ private final SnapshotStore snapshotStore;
private final Command shutDownNow;
@Inject
@@ -118,14 +118,14 @@ public interface Recovery {
File backupDir,
Function<Snapshot, TemporaryStorage> tempStorageFactory,
Storage primaryStorage,
- DistributedSnapshotStore distributedStore,
+ SnapshotStore snapshotStore,
Command shutDownNow) {
this.backupDir = requireNonNull(backupDir);
this.tempStorageFactory = requireNonNull(tempStorageFactory);
this.recovery = Atomics.newReference();
this.primaryStorage = requireNonNull(primaryStorage);
- this.distributedStore = requireNonNull(distributedStore);
+ this.snapshotStore = requireNonNull(snapshotStore);
this.shutDownNow = requireNonNull(shutDownNow);
}
@@ -197,7 +197,7 @@ public interface Recovery {
void commit() {
primaryStorage.write((NoResult.Quiet) storeProvider -> {
try {
- distributedStore.snapshotWith(tempStorage.toSnapshot());
+ snapshotStore.snapshotWith(tempStorage.toSnapshot());
shutDownNow.execute();
} catch (CodingException e) {
throw new IllegalStateException("Failed to encode snapshot.", e);
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java
index 2d61678..1675893 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Locale;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Qualifier;
@@ -42,8 +43,11 @@ import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.Stats;
import org.apache.aurora.common.util.Clock;
+import org.apache.aurora.gen.storage.Op;
import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Snapshotter;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
@@ -69,7 +73,7 @@ public interface StorageBackup {
*/
void backupNow();
- class StorageBackupImpl implements StorageBackup, SnapshotStore<Snapshot> {
+ class StorageBackupImpl implements StorageBackup, Snapshotter {
private static final Logger LOG = LoggerFactory.getLogger(StorageBackupImpl.class);
private static final String FILE_PREFIX = "scheduler-backup-";
@@ -93,13 +97,14 @@ public interface StorageBackup {
}
/**
- * Binding annotation that the underlying {@link SnapshotStore} must be bound with.
+ * Binding annotation that the underlying {@link Snapshotter} must be bound with.
*/
@Qualifier
@Target({FIELD, PARAMETER, METHOD}) @Retention(RUNTIME)
@interface SnapshotDelegate { }
- private final SnapshotStore<Snapshot> delegate;
+ private final Storage storage;
+ private final Snapshotter delegate;
private final Clock clock;
private final long backupIntervalMs;
private volatile long lastBackupMs;
@@ -120,11 +125,13 @@ public interface StorageBackup {
@Inject
StorageBackupImpl(
- @SnapshotDelegate SnapshotStore<Snapshot> delegate,
+ Storage storage,
+ @SnapshotDelegate Snapshotter delegate,
Clock clock,
BackupConfig config,
Executor executor) {
+ this.storage = requireNonNull(storage);
this.delegate = requireNonNull(delegate);
this.clock = requireNonNull(clock);
this.config = requireNonNull(config);
@@ -135,8 +142,8 @@ public interface StorageBackup {
}
@Override
- public Snapshot createSnapshot() {
- final Snapshot snapshot = delegate.createSnapshot();
+ public Snapshot from(StoreProvider stores) {
+ Snapshot snapshot = delegate.from(stores);
if (clock.nowMillis() >= (lastBackupMs + backupIntervalMs)) {
executor.execute(() -> save(snapshot));
}
@@ -145,7 +152,7 @@ public interface StorageBackup {
@Override
public void backupNow() {
- save(delegate.createSnapshot());
+ save(storage.write(delegate::from));
}
@VisibleForTesting
@@ -210,8 +217,8 @@ public interface StorageBackup {
static final Function<File, String> FILE_NAME = File::getName;
@Override
- public void applySnapshot(Snapshot snapshot) {
- delegate.applySnapshot(snapshot);
+ public Stream<Op> asStream(Snapshot snapshot) {
+ return delegate.asStream(snapshot);
}
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
index 18296b0..0305d9d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
@@ -24,9 +24,11 @@ import org.apache.aurora.common.util.testing.FakeClock;
import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Snapshotter;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.durability.Loader;
+import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
@@ -78,16 +80,15 @@ interface TemporaryStorage {
@Override
public TemporaryStorage apply(Snapshot snapshot) {
- final Storage storage = MemStorageModule.newEmptyStorage();
- final BuildInfo buildInfo = generateBuildInfo();
+ Storage storage = MemStorageModule.newEmptyStorage();
+ BuildInfo buildInfo = generateBuildInfo();
FakeClock clock = new FakeClock();
clock.setNowMillis(snapshot.getTimestamp());
- final SnapshotStore<Snapshot> snapshotStore = new SnapshotStoreImpl(
- buildInfo,
- clock,
- storage,
- thriftBackfill);
- snapshotStore.applySnapshot(snapshot);
+ Snapshotter snapshotter = new SnapshotStoreImpl(buildInfo, clock);
+
+ storage.write((NoResult.Quiet) stores -> {
+ Loader.load(stores, thriftBackfill, snapshotter.asStream(snapshot).map(Edit::op));
+ });
return new TemporaryStorage() {
@Override
@@ -107,7 +108,7 @@ interface TemporaryStorage {
@Override
public Snapshot toSnapshot() {
- return snapshotStore.createSnapshot();
+ return storage.write(snapshotter::from);
}
};
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java
index 6a7c0ad..f1fdc27 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java
@@ -14,23 +14,13 @@
package org.apache.aurora.scheduler.storage.durability;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
import javax.inject.Inject;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-
import org.apache.aurora.common.inject.TimedInterceptor.Timed;
import org.apache.aurora.common.stats.SlidingStats;
-import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.SaveCronJob;
-import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
-import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
-import org.apache.aurora.gen.storage.SaveQuota;
import org.apache.aurora.scheduler.base.SchedulerException;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.storage.AttributeStore;
@@ -43,12 +33,6 @@ import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
import org.apache.aurora.scheduler.storage.TaskStore;
import org.apache.aurora.scheduler.storage.durability.Persistence.PersistenceException;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
@@ -101,32 +85,17 @@ public class DurableStorage implements NonVolatileStorage {
void log(Op op);
}
- private static final Logger LOG = LoggerFactory.getLogger(DurableStorage.class);
-
private final Persistence persistence;
private final Storage writeBehindStorage;
- private final SchedulerStore.Mutable writeBehindSchedulerStore;
- private final CronJobStore.Mutable writeBehindJobStore;
- private final TaskStore.Mutable writeBehindTaskStore;
- private final QuotaStore.Mutable writeBehindQuotaStore;
- private final AttributeStore.Mutable writeBehindAttributeStore;
- private final JobUpdateStore.Mutable writeBehindJobUpdateStore;
private final ReentrantLock writeLock;
private final ThriftBackfill thriftBackfill;
- private final WriteAheadStorage writeAheadStorage;
+ private final WriteRecorder writeRecorder;
- // TODO(wfarner): It should be possible to remove this flag now, since all call stacks when
- // recovering are controlled at this layer (they're all calls to Mutable store implementations).
- // The more involved change is changing SnapshotStore to accept a Mutable store provider to
- // avoid a call to Storage.write() when we replay a Snapshot.
- private boolean recovered = false;
private TransactionRecorder transaction = null;
private final SlidingStats writerWaitStats = new SlidingStats("storage_write_lock_wait", "ns");
- private final Map<Op._Fields, Consumer<Op>> transactionReplayActions;
-
@Inject
DurableStorage(
Persistence persistence,
@@ -147,12 +116,6 @@ public class DurableStorage implements NonVolatileStorage {
// we write directly to the writeBehind stores since we are replaying what's already persisted.
// After that, all writes must succeed in Persistence before they may be considered successful.
this.writeBehindStorage = requireNonNull(delegateStorage);
- this.writeBehindSchedulerStore = requireNonNull(schedulerStore);
- this.writeBehindJobStore = requireNonNull(jobStore);
- this.writeBehindTaskStore = requireNonNull(taskStore);
- this.writeBehindQuotaStore = requireNonNull(quotaStore);
- this.writeBehindAttributeStore = requireNonNull(attributeStore);
- this.writeBehindJobUpdateStore = requireNonNull(jobUpdateStore);
this.writeLock = requireNonNull(writeLock);
this.thriftBackfill = requireNonNull(thriftBackfill);
TransactionManager transactionManager = new TransactionManager() {
@@ -166,7 +129,7 @@ public class DurableStorage implements NonVolatileStorage {
transaction.add(op);
}
};
- this.writeAheadStorage = new WriteAheadStorage(
+ this.writeRecorder = new WriteRecorder(
transactionManager,
schedulerStore,
jobStore,
@@ -174,81 +137,8 @@ public class DurableStorage implements NonVolatileStorage {
quotaStore,
attributeStore,
jobUpdateStore,
- LoggerFactory.getLogger(WriteAheadStorage.class),
+ LoggerFactory.getLogger(WriteRecorder.class),
eventSink);
-
- this.transactionReplayActions = buildTransactionReplayActions();
- }
-
- @VisibleForTesting
- final Map<Op._Fields, Consumer<Op>> buildTransactionReplayActions() {
- return ImmutableMap.<Op._Fields, Consumer<Op>>builder()
- .put(
- Op._Fields.SAVE_FRAMEWORK_ID,
- op -> writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId()))
- .put(Op._Fields.SAVE_CRON_JOB, op -> {
- SaveCronJob cronJob = op.getSaveCronJob();
- writeBehindJobStore.saveAcceptedJob(
- thriftBackfill.backfillJobConfiguration(cronJob.getJobConfig()));
- })
- .put(
- Op._Fields.REMOVE_JOB,
- op -> writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey())))
- .put(
- Op._Fields.SAVE_TASKS,
- op -> writeBehindTaskStore.saveTasks(
- thriftBackfill.backfillTasks(op.getSaveTasks().getTasks())))
- .put(
- Op._Fields.REMOVE_TASKS,
- op -> writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds()))
- .put(Op._Fields.SAVE_QUOTA, op -> {
- SaveQuota saveQuota = op.getSaveQuota();
- writeBehindQuotaStore.saveQuota(
- saveQuota.getRole(),
- ThriftBackfill.backfillResourceAggregate(saveQuota.getQuota()));
- })
- .put(
- Op._Fields.REMOVE_QUOTA,
- op -> writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole()))
- .put(Op._Fields.SAVE_HOST_ATTRIBUTES, op -> {
- HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes();
- // Prior to commit 5cf760b, the store would persist maintenance mode changes for
- // unknown hosts. 5cf760b began rejecting these, but the storage may still
- // contain entries with a null slave ID.
- if (attributes.isSetSlaveId()) {
- writeBehindAttributeStore.saveHostAttributes(IHostAttributes.build(attributes));
- } else {
- LOG.info("Dropping host attributes with no agent ID: " + attributes);
- }
- })
- .put(
- Op._Fields.SAVE_LOCK, // TODO(jly): Deprecated, remove in 0.21. See AURORA-1959.
- op -> { /* no-op */ })
- .put(
- Op._Fields.REMOVE_LOCK, // TODO(jly): Deprecated, remove in 0.21. See AURORA-1959.
- op -> { /* no-op */ })
- .put(Op._Fields.SAVE_JOB_UPDATE, op ->
- writeBehindJobUpdateStore.saveJobUpdate(
- thriftBackfill.backFillJobUpdate(op.getSaveJobUpdate().getJobUpdate())))
- .put(Op._Fields.SAVE_JOB_UPDATE_EVENT, op -> {
- SaveJobUpdateEvent event = op.getSaveJobUpdateEvent();
- writeBehindJobUpdateStore.saveJobUpdateEvent(
- IJobUpdateKey.build(event.getKey()),
- IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent()));
- })
- .put(Op._Fields.SAVE_JOB_INSTANCE_UPDATE_EVENT, op -> {
- SaveJobInstanceUpdateEvent event = op.getSaveJobInstanceUpdateEvent();
- writeBehindJobUpdateStore.saveJobInstanceUpdateEvent(
- IJobUpdateKey.build(event.getKey()),
- IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent()));
- })
- .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, op -> {
- LOG.info("Dropping prune operation. Updates will be pruned later.");
- })
- .put(Op._Fields.REMOVE_JOB_UPDATE, op ->
- writeBehindJobUpdateStore.removeJobUpdates(
- IJobUpdateKey.setFromBuilders(op.getRemoveJobUpdate().getKeys())))
- .build();
}
@Override
@@ -260,18 +150,18 @@ public class DurableStorage implements NonVolatileStorage {
@Override
@Timed("scheduler_storage_start")
- public synchronized void start(final MutateWork.NoResult.Quiet initializationLogic) {
- write((NoResult.Quiet) unused -> {
- // Must have the underlying storage started so we can query it.
- // We replay these entries in the forwarded storage system's transactions but not ours - we
- // do not want to re-record these ops.
- recover();
- recovered = true;
+ public void start(final MutateWork.NoResult.Quiet initializationLogic) {
+ writeLock.lock();
+ try {
+ // We recover directly into the forwarded system to avoid persisting replayed operations.
+ writeBehindStorage.write((NoResult.Quiet) this::recover);
// Now that we're recovered we should persist any mutations done in initializationLogic, so
// run it in one of our transactions.
write(initializationLogic);
- });
+ } finally {
+ writeLock.unlock();
+ }
}
@Override
@@ -280,9 +170,9 @@ public class DurableStorage implements NonVolatileStorage {
}
@Timed("scheduler_storage_recover")
- void recover() throws RecoveryFailedException {
+ void recover(MutableStoreProvider stores) throws RecoveryFailedException {
try {
- persistence.recover().forEach(DurableStorage.this::replayOp);
+ Loader.load(stores, thriftBackfill, persistence.recover());
} catch (PersistenceException e) {
throw new RecoveryFailedException(e);
}
@@ -294,28 +184,19 @@ public class DurableStorage implements NonVolatileStorage {
}
}
- private void replayOp(Op op) {
- Op._Fields opField = op.getSetField();
- if (!transactionReplayActions.containsKey(opField)) {
- throw new IllegalStateException("Unknown transaction op: " + opField);
- }
-
- transactionReplayActions.get(opField).accept(op);
- }
-
private <T, E extends Exception> T doInTransaction(final MutateWork<T, E> work)
throws StorageException, E {
// The transaction has already been set up so we just need to delegate with our store provider
// so any mutations may be persisted.
if (transaction != null) {
- return work.apply(writeAheadStorage);
+ return work.apply(writeRecorder);
}
transaction = new TransactionRecorder();
try {
return writeBehindStorage.write(unused -> {
- T result = work.apply(writeAheadStorage);
+ T result = work.apply(writeRecorder);
List<Op> ops = transaction.getOps();
if (!ops.isEmpty()) {
try {
@@ -337,12 +218,6 @@ public class DurableStorage implements NonVolatileStorage {
writeLock.lock();
try {
writerWaitStats.accumulate(System.nanoTime() - waitStart);
- // We don't want to persist when recovering, we just want to update the underlying
- // store - so pass mutations straight through to the underlying storage.
- if (!recovered) {
- return writeBehindStorage.write(work);
- }
-
return doInTransaction(work);
} finally {
writeLock.unlock();
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/durability/Loader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/Loader.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/Loader.java
new file mode 100644
index 0000000..10864f1
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/Loader.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.stream.Stream;
+
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
+import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class Loader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Loader.class);
+
+ private Loader() {
+ // Utility class.
+ }
+
+ /**
+ * Loads a sequence of storage operations into the provided stores, applying backfills.
+ *
+ * @param stores Stores to populate.
+ * @param backfill Backfill mechanism to use.
+ * @param edits Edits to apply.
+ */
+ public static void load(
+ MutableStoreProvider stores,
+ ThriftBackfill backfill,
+ Stream<Edit> edits) {
+
+ edits.forEach(edit -> load(stores, backfill, edit));
+ }
+
+ private static void load(MutableStoreProvider stores, ThriftBackfill backfill, Edit edit) {
+ if (edit.isDeleteAll()) {
+ LOG.info("Resetting storage");
+ stores.getCronJobStore().deleteJobs();
+ stores.getUnsafeTaskStore().deleteAllTasks();
+ stores.getQuotaStore().deleteQuotas();
+ stores.getAttributeStore().deleteHostAttributes();
+ stores.getJobUpdateStore().deleteAllUpdates();
+ return;
+ }
+
+ Op op = edit.getOp();
+ switch (op.getSetField()) {
+ case SAVE_FRAMEWORK_ID:
+ stores.getSchedulerStore().saveFrameworkId(op.getSaveFrameworkId().getId());
+ break;
+
+ case SAVE_CRON_JOB:
+ stores.getCronJobStore().saveAcceptedJob(
+ backfill.backfillJobConfiguration(op.getSaveCronJob().getJobConfig()));
+ break;
+
+ case REMOVE_JOB:
+ stores.getCronJobStore().removeJob(IJobKey.build(op.getRemoveJob().getJobKey()));
+ break;
+
+ case REMOVE_LOCK:
+ case SAVE_LOCK:
+ // TODO(jly): Deprecated, remove in 0.21. See AURORA-1959.
+ break;
+
+ case SAVE_TASKS:
+ stores.getUnsafeTaskStore().saveTasks(backfill.backfillTasks(op.getSaveTasks().getTasks()));
+ break;
+
+ case REMOVE_TASKS:
+ stores.getUnsafeTaskStore().deleteTasks(op.getRemoveTasks().getTaskIds());
+ break;
+
+ case SAVE_QUOTA:
+ SaveQuota saveQuota = op.getSaveQuota();
+ stores.getQuotaStore().saveQuota(
+ saveQuota.getRole(),
+ ThriftBackfill.backfillResourceAggregate(saveQuota.getQuota()));
+ break;
+
+ case REMOVE_QUOTA:
+ stores.getQuotaStore().removeQuota(op.getRemoveQuota().getRole());
+ break;
+
+ case SAVE_HOST_ATTRIBUTES:
+ HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes();
+ // Prior to commit 5cf760b, the store would persist maintenance mode changes for
+ // unknown hosts. 5cf760b began rejecting these, but the storage may still
+ // contain entries with a null slave ID.
+ if (attributes.isSetSlaveId()) {
+ stores.getAttributeStore().saveHostAttributes(IHostAttributes.build(attributes));
+ } else {
+ LOG.info("Dropping host attributes with no agent ID: " + attributes);
+ }
+ break;
+
+ case SAVE_JOB_UPDATE:
+ stores.getJobUpdateStore().saveJobUpdate(
+ backfill.backFillJobUpdate(op.getSaveJobUpdate().getJobUpdate()));
+ break;
+
+ case SAVE_JOB_UPDATE_EVENT:
+ SaveJobUpdateEvent jobEvent = op.getSaveJobUpdateEvent();
+ stores.getJobUpdateStore().saveJobUpdateEvent(
+ IJobUpdateKey.build(jobEvent.getKey()),
+ IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent()));
+ break;
+
+ case SAVE_JOB_INSTANCE_UPDATE_EVENT:
+ SaveJobInstanceUpdateEvent instanceEvent = op.getSaveJobInstanceUpdateEvent();
+ stores.getJobUpdateStore().saveJobInstanceUpdateEvent(
+ IJobUpdateKey.build(instanceEvent.getKey()),
+ IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent()));
+ break;
+
+ case PRUNE_JOB_UPDATE_HISTORY:
+ LOG.info("Dropping prune operation. Updates will be pruned later.");
+ break;
+
+ case REMOVE_JOB_UPDATE:
+ stores.getJobUpdateStore().removeJobUpdates(
+ IJobUpdateKey.setFromBuilders(op.getRemoveJobUpdate().getKeys()));
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unrecognized op type " + op.getSetField());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java
index 9eb862c..4476d90 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java
@@ -13,10 +13,15 @@
*/
package org.apache.aurora.scheduler.storage.durability;
+import java.util.Objects;
import java.util.stream.Stream;
+import javax.annotation.Nullable;
+
import org.apache.aurora.gen.storage.Op;
+import static java.util.Objects.requireNonNull;
+
/**
* Persistence layer for storage operations.
*/
@@ -31,10 +36,10 @@ public interface Persistence {
/**
* Recovers previously-persisted records.
*
- * @return All persisted records.
+ * @return All edits to apply.
* @throws PersistenceException If recovery failed.
*/
- Stream<Op> recover() throws PersistenceException;
+ Stream<Edit> recover() throws PersistenceException;
/**
* Saves new records. No records may be considered durably saved until this method returns
@@ -46,6 +51,53 @@ public interface Persistence {
void persist(Stream<Op> records) throws PersistenceException;
/**
+ * An edit to apply when recovering from persistence.
+ */
+ class Edit {
+ @Nullable private final Op op;
+
+ private Edit(@Nullable Op op) {
+ this.op = op;
+ }
+
+ public static Edit op(Op op) {
+ return new Edit(requireNonNull(op));
+ }
+
+ public static Edit deleteAll() {
+ return new Edit(null);
+ }
+
+ public boolean isDeleteAll() {
+ return op == null;
+ }
+
+ public Op getOp() {
+ return requireNonNull(op);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof Edit)) {
+ return false;
+ }
+
+ Edit other = (Edit) obj;
+ return Objects.equals(op, other.op);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(op);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toString(op);
+ }
+ }
+
+ /**
* Thrown when a persistence operation fails.
*/
class PersistenceException extends Exception {
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java
deleted file mode 100644
index 667db06..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.durability;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.RemoveJob;
-import org.apache.aurora.gen.storage.RemoveQuota;
-import org.apache.aurora.gen.storage.RemoveTasks;
-import org.apache.aurora.gen.storage.SaveCronJob;
-import org.apache.aurora.gen.storage.SaveFrameworkId;
-import org.apache.aurora.gen.storage.SaveHostAttributes;
-import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
-import org.apache.aurora.gen.storage.SaveJobUpdate;
-import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
-import org.apache.aurora.gen.storage.SaveQuota;
-import org.apache.aurora.gen.storage.SaveTasks;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.CronJobStore;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.QuotaStore;
-import org.apache.aurora.scheduler.storage.SchedulerStore;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.slf4j.Logger;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Mutable stores implementation that translates all operations to {@link Op}s (which are passed
- * to a provided {@link TransactionManager}) before forwarding the operations to delegate mutable
- * stores.
- */
-public class WriteAheadStorage implements
- MutableStoreProvider,
- SchedulerStore.Mutable,
- CronJobStore.Mutable,
- TaskStore.Mutable,
- QuotaStore.Mutable,
- AttributeStore.Mutable,
- JobUpdateStore.Mutable {
-
- private final TransactionManager transactionManager;
- private final SchedulerStore.Mutable schedulerStore;
- private final CronJobStore.Mutable jobStore;
- private final TaskStore.Mutable taskStore;
- private final QuotaStore.Mutable quotaStore;
- private final AttributeStore.Mutable attributeStore;
- private final JobUpdateStore.Mutable jobUpdateStore;
- private final Logger log;
- private final EventSink eventSink;
-
- /**
- * Creates a new write-ahead storage that delegates to the providing default stores.
- *
- * @param transactionManager External controller for transaction operations.
- * @param schedulerStore Delegate.
- * @param jobStore Delegate.
- * @param taskStore Delegate.
- * @param quotaStore Delegate.
- * @param attributeStore Delegate.
- * @param jobUpdateStore Delegate.
- */
- public WriteAheadStorage(
- TransactionManager transactionManager,
- SchedulerStore.Mutable schedulerStore,
- CronJobStore.Mutable jobStore,
- TaskStore.Mutable taskStore,
- QuotaStore.Mutable quotaStore,
- AttributeStore.Mutable attributeStore,
- JobUpdateStore.Mutable jobUpdateStore,
- Logger log,
- EventSink eventSink) {
-
- this.transactionManager = requireNonNull(transactionManager);
- this.schedulerStore = requireNonNull(schedulerStore);
- this.jobStore = requireNonNull(jobStore);
- this.taskStore = requireNonNull(taskStore);
- this.quotaStore = requireNonNull(quotaStore);
- this.attributeStore = requireNonNull(attributeStore);
- this.jobUpdateStore = requireNonNull(jobUpdateStore);
- this.log = requireNonNull(log);
- this.eventSink = requireNonNull(eventSink);
- }
-
- private void write(Op op) {
- Preconditions.checkState(
- transactionManager.hasActiveTransaction(),
- "Mutating operations must be within a transaction.");
- transactionManager.log(op);
- }
-
- @Override
- public void saveFrameworkId(final String frameworkId) {
- requireNonNull(frameworkId);
-
- write(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
- schedulerStore.saveFrameworkId(frameworkId);
- }
-
- @Override
- public void deleteTasks(final Set<String> taskIds) {
- requireNonNull(taskIds);
-
- write(Op.removeTasks(new RemoveTasks(taskIds)));
- taskStore.deleteTasks(taskIds);
- }
-
- @Override
- public void saveTasks(final Set<IScheduledTask> newTasks) {
- requireNonNull(newTasks);
-
- write(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks))));
- taskStore.saveTasks(newTasks);
- }
-
- @Override
- public Optional<IScheduledTask> mutateTask(
- String taskId,
- Function<IScheduledTask, IScheduledTask> mutator) {
-
- Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, mutator);
- log.debug("Storing updated task to log: {}={}", taskId, mutated.get().getStatus());
- write(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
-
- return mutated;
- }
-
- @Override
- public void saveQuota(final String role, final IResourceAggregate quota) {
- requireNonNull(role);
- requireNonNull(quota);
-
- write(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
- quotaStore.saveQuota(role, quota);
- }
-
- @Override
- public boolean saveHostAttributes(final IHostAttributes attrs) {
- requireNonNull(attrs);
-
- boolean changed = attributeStore.saveHostAttributes(attrs);
- if (changed) {
- write(Op.saveHostAttributes(new SaveHostAttributes(attrs.newBuilder())));
- eventSink.post(new PubsubEvent.HostAttributesChanged(attrs));
- }
- return changed;
- }
-
- @Override
- public void removeJob(final IJobKey jobKey) {
- requireNonNull(jobKey);
-
- write(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder())));
- jobStore.removeJob(jobKey);
- }
-
- @Override
- public void saveAcceptedJob(final IJobConfiguration jobConfig) {
- requireNonNull(jobConfig);
-
- write(Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder())));
- jobStore.saveAcceptedJob(jobConfig);
- }
-
- @Override
- public void removeQuota(final String role) {
- requireNonNull(role);
-
- write(Op.removeQuota(new RemoveQuota(role)));
- quotaStore.removeQuota(role);
- }
-
- @Override
- public void saveJobUpdate(IJobUpdate update) {
- requireNonNull(update);
-
- write(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder())));
- jobUpdateStore.saveJobUpdate(update);
- }
-
- @Override
- public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) {
- requireNonNull(key);
- requireNonNull(event);
-
- write(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(event.newBuilder(), key.newBuilder())));
- jobUpdateStore.saveJobUpdateEvent(key, event);
- }
-
- @Override
- public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) {
- requireNonNull(key);
- requireNonNull(event);
-
- write(Op.saveJobInstanceUpdateEvent(
- new SaveJobInstanceUpdateEvent(event.newBuilder(), key.newBuilder())));
- jobUpdateStore.saveJobInstanceUpdateEvent(key, event);
- }
-
- @Override
- public void removeJobUpdates(Set<IJobUpdateKey> keys) {
- requireNonNull(keys);
-
- // Compatibility mode - RemoveJobUpdates is not yet written since older versions cannot
- // read it. JobUpdates are only removed implicitly when a snapshot is taken.
- jobUpdateStore.removeJobUpdates(keys);
- }
-
- @Override
- public void deleteAllTasks() {
- throw new UnsupportedOperationException(
- "Unsupported since casual storage users should never be doing this.");
- }
-
- @Override
- public void deleteHostAttributes() {
- throw new UnsupportedOperationException(
- "Unsupported since casual storage users should never be doing this.");
- }
-
- @Override
- public void deleteJobs() {
- throw new UnsupportedOperationException(
- "Unsupported since casual storage users should never be doing this.");
- }
-
- @Override
- public void deleteQuotas() {
- throw new UnsupportedOperationException(
- "Unsupported since casual storage users should never be doing this.");
- }
-
- @Override
- public void deleteAllUpdates() {
- throw new UnsupportedOperationException(
- "Unsupported since casual storage users should never be doing this.");
- }
-
- @Override
- public SchedulerStore.Mutable getSchedulerStore() {
- return this;
- }
-
- @Override
- public CronJobStore.Mutable getCronJobStore() {
- return this;
- }
-
- @Override
- public TaskStore.Mutable getUnsafeTaskStore() {
- return this;
- }
-
- @Override
- public QuotaStore.Mutable getQuotaStore() {
- return this;
- }
-
- @Override
- public AttributeStore.Mutable getAttributeStore() {
- return this;
- }
-
- @Override
- public TaskStore getTaskStore() {
- return this;
- }
-
- @Override
- public JobUpdateStore.Mutable getJobUpdateStore() {
- return this;
- }
-
- @Override
- public Optional<String> fetchFrameworkId() {
- return this.schedulerStore.fetchFrameworkId();
- }
-
- @Override
- public Iterable<IJobConfiguration> fetchJobs() {
- return this.jobStore.fetchJobs();
- }
-
- @Override
- public Optional<IJobConfiguration> fetchJob(IJobKey jobKey) {
- return this.jobStore.fetchJob(jobKey);
- }
-
- @Override
- public Optional<IScheduledTask> fetchTask(String taskId) {
- return this.taskStore.fetchTask(taskId);
- }
-
- @Override
- public Iterable<IScheduledTask> fetchTasks(Query.Builder query) {
- return this.taskStore.fetchTasks(query);
- }
-
- @Override
- public Set<IJobKey> getJobKeys() {
- return this.taskStore.getJobKeys();
- }
-
- @Override
- public Optional<IResourceAggregate> fetchQuota(String role) {
- return this.quotaStore.fetchQuota(role);
- }
-
- @Override
- public Map<String, IResourceAggregate> fetchQuotas() {
- return this.quotaStore.fetchQuotas();
- }
-
- @Override
- public Optional<IHostAttributes> getHostAttributes(String host) {
- return this.attributeStore.getHostAttributes(host);
- }
-
- @Override
- public Set<IHostAttributes> getHostAttributes() {
- return this.attributeStore.getHostAttributes();
- }
-
- @Override
- public List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) {
- return this.jobUpdateStore.fetchJobUpdates(query);
- }
-
- @Override
- public Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) {
- return this.jobUpdateStore.fetchJobUpdate(key);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
new file mode 100644
index 0000000..5ae834a
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteRecorder.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.RemoveJob;
+import org.apache.aurora.gen.storage.RemoveQuota;
+import org.apache.aurora.gen.storage.RemoveTasks;
+import org.apache.aurora.gen.storage.SaveCronJob;
+import org.apache.aurora.gen.storage.SaveFrameworkId;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
+import org.apache.aurora.gen.storage.SaveJobUpdate;
+import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.slf4j.Logger;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Mutable stores implementation that translates all operations to {@link Op}s (which are passed
+ * to a provided {@link TransactionManager}) before forwarding the operations to delegate mutable
+ * stores.
+ */
+public class WriteRecorder implements
+ MutableStoreProvider,
+ SchedulerStore.Mutable,
+ CronJobStore.Mutable,
+ TaskStore.Mutable,
+ QuotaStore.Mutable,
+ AttributeStore.Mutable,
+ JobUpdateStore.Mutable {
+
+ private final TransactionManager transactionManager;
+ private final SchedulerStore.Mutable schedulerStore;
+ private final CronJobStore.Mutable jobStore;
+ private final TaskStore.Mutable taskStore;
+ private final QuotaStore.Mutable quotaStore;
+ private final AttributeStore.Mutable attributeStore;
+ private final JobUpdateStore.Mutable jobUpdateStore;
+ private final Logger log;
+ private final EventSink eventSink;
+
+ /**
+ * Creates a new write-ahead storage that delegates to the providing default stores.
+ *
+ * @param transactionManager External controller for transaction operations.
+ * @param schedulerStore Delegate.
+ * @param jobStore Delegate.
+ * @param taskStore Delegate.
+ * @param quotaStore Delegate.
+ * @param attributeStore Delegate.
+ * @param jobUpdateStore Delegate.
+ */
+ public WriteRecorder(
+ TransactionManager transactionManager,
+ SchedulerStore.Mutable schedulerStore,
+ CronJobStore.Mutable jobStore,
+ TaskStore.Mutable taskStore,
+ QuotaStore.Mutable quotaStore,
+ AttributeStore.Mutable attributeStore,
+ JobUpdateStore.Mutable jobUpdateStore,
+ Logger log,
+ EventSink eventSink) {
+
+ this.transactionManager = requireNonNull(transactionManager);
+ this.schedulerStore = requireNonNull(schedulerStore);
+ this.jobStore = requireNonNull(jobStore);
+ this.taskStore = requireNonNull(taskStore);
+ this.quotaStore = requireNonNull(quotaStore);
+ this.attributeStore = requireNonNull(attributeStore);
+ this.jobUpdateStore = requireNonNull(jobUpdateStore);
+ this.log = requireNonNull(log);
+ this.eventSink = requireNonNull(eventSink);
+ }
+
+ private void write(Op op) {
+ Preconditions.checkState(
+ transactionManager.hasActiveTransaction(),
+ "Mutating operations must be within a transaction.");
+ transactionManager.log(op);
+ }
+
+ @Override
+ public void saveFrameworkId(final String frameworkId) {
+ requireNonNull(frameworkId);
+
+ write(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
+ schedulerStore.saveFrameworkId(frameworkId);
+ }
+
+ @Override
+ public void deleteTasks(final Set<String> taskIds) {
+ requireNonNull(taskIds);
+
+ write(Op.removeTasks(new RemoveTasks(taskIds)));
+ taskStore.deleteTasks(taskIds);
+ }
+
+ @Override
+ public void saveTasks(final Set<IScheduledTask> newTasks) {
+ requireNonNull(newTasks);
+
+ write(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks))));
+ taskStore.saveTasks(newTasks);
+ }
+
+ @Override
+ public Optional<IScheduledTask> mutateTask(
+ String taskId,
+ Function<IScheduledTask, IScheduledTask> mutator) {
+
+ Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, mutator);
+ log.debug("Storing updated task to log: {}={}", taskId, mutated.get().getStatus());
+ write(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
+
+ return mutated;
+ }
+
+ @Override
+ public void saveQuota(final String role, final IResourceAggregate quota) {
+ requireNonNull(role);
+ requireNonNull(quota);
+
+ write(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
+ quotaStore.saveQuota(role, quota);
+ }
+
+ @Override
+ public boolean saveHostAttributes(final IHostAttributes attrs) {
+ requireNonNull(attrs);
+
+ boolean changed = attributeStore.saveHostAttributes(attrs);
+ if (changed) {
+ write(Op.saveHostAttributes(new SaveHostAttributes(attrs.newBuilder())));
+ eventSink.post(new PubsubEvent.HostAttributesChanged(attrs));
+ }
+ return changed;
+ }
+
+ @Override
+ public void removeJob(final IJobKey jobKey) {
+ requireNonNull(jobKey);
+
+ write(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder())));
+ jobStore.removeJob(jobKey);
+ }
+
+ @Override
+ public void saveAcceptedJob(final IJobConfiguration jobConfig) {
+ requireNonNull(jobConfig);
+
+ write(Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder())));
+ jobStore.saveAcceptedJob(jobConfig);
+ }
+
+ @Override
+ public void removeQuota(final String role) {
+ requireNonNull(role);
+
+ write(Op.removeQuota(new RemoveQuota(role)));
+ quotaStore.removeQuota(role);
+ }
+
+ @Override
+ public void saveJobUpdate(IJobUpdate update) {
+ requireNonNull(update);
+
+ write(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder())));
+ jobUpdateStore.saveJobUpdate(update);
+ }
+
+ @Override
+ public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) {
+ requireNonNull(key);
+ requireNonNull(event);
+
+ write(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(event.newBuilder(), key.newBuilder())));
+ jobUpdateStore.saveJobUpdateEvent(key, event);
+ }
+
+ @Override
+ public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) {
+ requireNonNull(key);
+ requireNonNull(event);
+
+ write(Op.saveJobInstanceUpdateEvent(
+ new SaveJobInstanceUpdateEvent(event.newBuilder(), key.newBuilder())));
+ jobUpdateStore.saveJobInstanceUpdateEvent(key, event);
+ }
+
+ @Override
+ public void removeJobUpdates(Set<IJobUpdateKey> keys) {
+ requireNonNull(keys);
+
+ // Compatibility mode - RemoveJobUpdates is not yet written since older versions cannot
+ // read it. JobUpdates are only removed implicitly when a snapshot is taken.
+ jobUpdateStore.removeJobUpdates(keys);
+ }
+
+ @Override
+ public void deleteAllTasks() {
+ throw new UnsupportedOperationException(
+ "Unsupported since casual storage users should never be doing this.");
+ }
+
+ @Override
+ public void deleteHostAttributes() {
+ throw new UnsupportedOperationException(
+ "Unsupported since casual storage users should never be doing this.");
+ }
+
+ @Override
+ public void deleteJobs() {
+ throw new UnsupportedOperationException(
+ "Unsupported since casual storage users should never be doing this.");
+ }
+
+ @Override
+ public void deleteQuotas() {
+ throw new UnsupportedOperationException(
+ "Unsupported since casual storage users should never be doing this.");
+ }
+
+ @Override
+ public void deleteAllUpdates() {
+ throw new UnsupportedOperationException(
+ "Unsupported since casual storage users should never be doing this.");
+ }
+
+ @Override
+ public SchedulerStore.Mutable getSchedulerStore() {
+ return this;
+ }
+
+ @Override
+ public CronJobStore.Mutable getCronJobStore() {
+ return this;
+ }
+
+ @Override
+ public TaskStore.Mutable getUnsafeTaskStore() {
+ return this;
+ }
+
+ @Override
+ public QuotaStore.Mutable getQuotaStore() {
+ return this;
+ }
+
+ @Override
+ public AttributeStore.Mutable getAttributeStore() {
+ return this;
+ }
+
+ @Override
+ public TaskStore getTaskStore() {
+ return this;
+ }
+
+ @Override
+ public JobUpdateStore.Mutable getJobUpdateStore() {
+ return this;
+ }
+
+ @Override
+ public Optional<String> fetchFrameworkId() {
+ return this.schedulerStore.fetchFrameworkId();
+ }
+
+ @Override
+ public Iterable<IJobConfiguration> fetchJobs() {
+ return this.jobStore.fetchJobs();
+ }
+
+ @Override
+ public Optional<IJobConfiguration> fetchJob(IJobKey jobKey) {
+ return this.jobStore.fetchJob(jobKey);
+ }
+
+ @Override
+ public Optional<IScheduledTask> fetchTask(String taskId) {
+ return this.taskStore.fetchTask(taskId);
+ }
+
+ @Override
+ public Iterable<IScheduledTask> fetchTasks(Query.Builder query) {
+ return this.taskStore.fetchTasks(query);
+ }
+
+ @Override
+ public Set<IJobKey> getJobKeys() {
+ return this.taskStore.getJobKeys();
+ }
+
+ @Override
+ public Optional<IResourceAggregate> fetchQuota(String role) {
+ return this.quotaStore.fetchQuota(role);
+ }
+
+ @Override
+ public Map<String, IResourceAggregate> fetchQuotas() {
+ return this.quotaStore.fetchQuotas();
+ }
+
+ @Override
+ public Optional<IHostAttributes> getHostAttributes(String host) {
+ return this.attributeStore.getHostAttributes(host);
+ }
+
+ @Override
+ public Set<IHostAttributes> getHostAttributes() {
+ return this.attributeStore.getHostAttributes();
+ }
+
+ @Override
+ public List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) {
+ return this.jobUpdateStore.fetchJobUpdates(query);
+ }
+
+ @Override
+ public Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) {
+ return this.jobUpdateStore.fetchJobUpdate(key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
index e70e605..8ca3169 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
@@ -16,31 +16,19 @@ package org.apache.aurora.scheduler.storage.log;
import java.io.IOException;
import java.util.Date;
import java.util.Iterator;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.inject.Inject;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.MoreExecutors;
-
import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
-import org.apache.aurora.common.application.ShutdownRegistry;
-import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.gen.storage.LogEntry;
import org.apache.aurora.gen.storage.Op;
import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.scheduler.base.AsyncUtil;
import org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException;
import org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
-import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.apache.aurora.scheduler.storage.Snapshotter;
import org.apache.aurora.scheduler.storage.durability.Persistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,42 +38,18 @@ import static java.util.Objects.requireNonNull;
/**
* Persistence layer that uses a replicated log.
*/
-class LogPersistence implements Persistence, DistributedSnapshotStore {
+class LogPersistence implements Persistence {
private static final Logger LOG = LoggerFactory.getLogger(LogPersistence.class);
private final LogManager logManager;
- private final SnapshotStore<Snapshot> snapshotStore;
- private final SchedulingService schedulingService;
- private final Amount<Long, Time> snapshotInterval;
+ private final Snapshotter snapshotter;
private StreamManager streamManager;
@Inject
- LogPersistence(
- Settings settings,
- LogManager logManager,
- SnapshotStore<Snapshot> snapshotStore,
- ShutdownRegistry shutdownRegistry) {
-
- this(new ScheduledExecutorSchedulingService(
- shutdownRegistry,
- settings.getShutdownGracePeriod()),
- settings.getSnapshotInterval(),
- logManager,
- snapshotStore);
- }
-
- @VisibleForTesting
- LogPersistence(
- SchedulingService schedulingService,
- Amount<Long, Time> snapshotInterval,
- LogManager logManager,
- SnapshotStore<Snapshot> snapshotStore) {
-
- this.schedulingService = requireNonNull(schedulingService);
- this.snapshotInterval = requireNonNull(snapshotInterval);
+ LogPersistence(LogManager logManager, Snapshotter snapshotter) {
this.logManager = requireNonNull(logManager);
- this.snapshotStore = requireNonNull(snapshotStore);
+ this.snapshotter = requireNonNull(snapshotter);
}
@Override
@@ -98,6 +62,15 @@ class LogPersistence implements Persistence, DistributedSnapshotStore {
}
}
+ /**
+ * Saves a snapshot to the log stream.
+ *
+ * @param snapshot Snapshot to save.
+ */
+ void persist(Snapshot snapshot) {
+ streamManager.snapshot(snapshot);
+ }
+
@Override
public void persist(Stream<Op> mutations) throws PersistenceException {
try {
@@ -108,9 +81,7 @@ class LogPersistence implements Persistence, DistributedSnapshotStore {
}
@Override
- public Stream<Op> recover() throws PersistenceException {
- scheduleSnapshots();
-
+ public Stream<Edit> recover() throws PersistenceException {
try {
Iterator<LogEntry> entries = streamManager.readFromBeginning();
Iterable<LogEntry> iterableEntries = () -> entries;
@@ -118,139 +89,26 @@ class LogPersistence implements Persistence, DistributedSnapshotStore {
return entryStream
.filter(entry -> entry.getSetField() != LogEntry._Fields.NOOP)
- .filter(entry -> {
- if (entry.getSetField() == LogEntry._Fields.SNAPSHOT) {
- Snapshot snapshot = entry.getSnapshot();
- LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp()));
- snapshotStore.applySnapshot(snapshot);
- return false;
- }
- return true;
- })
- .peek(entry -> {
- if (entry.getSetField() != LogEntry._Fields.TRANSACTION) {
- throw new IllegalStateException("Unknown log entry type: " + entry.getSetField());
+ .flatMap(entry -> {
+ switch (entry.getSetField()) {
+ case SNAPSHOT:
+ Snapshot snapshot = entry.getSnapshot();
+ LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp()));
+ return Stream.concat(
+ Stream.of(Edit.deleteAll()),
+ snapshotter.asStream(snapshot)
+ .map(Edit::op));
+
+ case TRANSACTION:
+ return entry.getTransaction().getOps().stream()
+ .map(Edit::op);
+
+ default:
+ throw new IllegalStateException("Unknown log entry type: " + entry.getSetField());
}
- })
- .flatMap(entry -> entry.getTransaction().getOps().stream());
+ });
} catch (CodingException | InvalidPositionException | StreamAccessException e) {
throw new PersistenceException(e);
}
}
-
- private void scheduleSnapshots() {
- if (snapshotInterval.getValue() > 0) {
- schedulingService.doEvery(snapshotInterval, () -> {
- try {
- snapshot();
- } catch (StorageException e) {
- if (e.getCause() == null) {
- LOG.warn("StorageException when attempting to snapshot.", e);
- } else {
- LOG.warn(e.getMessage(), e.getCause());
- }
- }
- });
- }
- }
-
- @Override
- public void snapshot() throws StorageException {
- try {
- doSnapshot();
- } catch (CodingException e) {
- throw new StorageException("Failed to encode a snapshot", e);
- } catch (InvalidPositionException e) {
- throw new StorageException("Saved snapshot but failed to truncate entries preceding it", e);
- } catch (StreamAccessException e) {
- throw new StorageException("Failed to create a snapshot", e);
- }
- }
-
- @Timed("scheduler_log_snapshot_persist")
- @Override
- public void snapshotWith(Snapshot snapshot)
- throws CodingException, InvalidPositionException, StreamAccessException {
-
- streamManager.snapshot(snapshot);
- }
-
- /**
- * Forces a snapshot of the storage state.
- *
- * @throws CodingException If there is a problem encoding the snapshot.
- * @throws InvalidPositionException If the log stream cursor is invalid.
- * @throws StreamAccessException If there is a problem writing the snapshot to the log stream.
- */
- @Timed("scheduler_log_snapshot")
- void doSnapshot() throws CodingException, InvalidPositionException, StreamAccessException {
- LOG.info("Creating snapshot.");
- Snapshot snapshot = snapshotStore.createSnapshot();
- snapshotWith(snapshot);
- LOG.info("Snapshot complete."
- + " host attrs: " + snapshot.getHostAttributesSize()
- + ", cron jobs: " + snapshot.getCronJobsSize()
- + ", quota confs: " + snapshot.getQuotaConfigurationsSize()
- + ", tasks: " + snapshot.getTasksSize()
- + ", updates: " + snapshot.getJobUpdateDetailsSize());
- }
-
- /**
- * A service that can schedule an action to be executed periodically.
- */
- @VisibleForTesting
- interface SchedulingService {
-
- /**
- * Schedules an action to execute periodically.
- *
- * @param interval The time period to wait until running the {@code action} again.
- * @param action The action to execute periodically.
- */
- void doEvery(Amount<Long, Time> interval, Runnable action);
- }
-
- private static class ScheduledExecutorSchedulingService implements SchedulingService {
- private final ScheduledExecutorService scheduledExecutor;
-
- ScheduledExecutorSchedulingService(ShutdownRegistry shutdownRegistry,
- Amount<Long, Time> shutdownGracePeriod) {
- scheduledExecutor = AsyncUtil.singleThreadLoggingScheduledExecutor("LogStorage-%d", LOG);
- shutdownRegistry.addAction(() -> MoreExecutors.shutdownAndAwaitTermination(
- scheduledExecutor,
- shutdownGracePeriod.getValue(),
- shutdownGracePeriod.getUnit().getTimeUnit()));
- }
-
- @Override
- public void doEvery(Amount<Long, Time> interval, Runnable action) {
- requireNonNull(interval);
- requireNonNull(action);
-
- long delay = interval.getValue();
- TimeUnit timeUnit = interval.getUnit().getTimeUnit();
- scheduledExecutor.scheduleWithFixedDelay(action, delay, delay, timeUnit);
- }
- }
-
- /**
- * Configuration settings for log persistence.
- */
- public static class Settings {
- private final Amount<Long, Time> shutdownGracePeriod;
- private final Amount<Long, Time> snapshotInterval;
-
- Settings(Amount<Long, Time> shutdownGracePeriod, Amount<Long, Time> snapshotInterval) {
- this.shutdownGracePeriod = requireNonNull(shutdownGracePeriod);
- this.snapshotInterval = requireNonNull(snapshotInterval);
- }
-
- public Amount<Long, Time> getShutdownGracePeriod() {
- return shutdownGracePeriod;
- }
-
- public Amount<Long, Time> getSnapshotInterval() {
- return snapshotInterval;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/5f79f7ca/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
index 75ec42a..671593c 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java
@@ -19,6 +19,7 @@ import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
+import com.google.inject.AbstractModule;
import com.google.inject.PrivateModule;
import com.google.inject.TypeLiteral;
import com.google.inject.assistedinject.FactoryModuleBuilder;
@@ -26,33 +27,28 @@ import com.google.inject.assistedinject.FactoryModuleBuilder;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Data;
import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.scheduler.SchedulerServicesModule;
import org.apache.aurora.scheduler.config.types.DataAmount;
import org.apache.aurora.scheduler.config.types.TimeAmount;
import org.apache.aurora.scheduler.storage.CallOrderEnforcingStorage;
-import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
import org.apache.aurora.scheduler.storage.durability.DurableStorage;
import org.apache.aurora.scheduler.storage.durability.Persistence;
+import org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl;
+import org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction;
import org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize;
-import org.apache.aurora.scheduler.storage.log.LogPersistence.Settings;
-
-import static org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl;
-import static org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction;
-import static org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.SnapshotDeduplicatorImpl;
+import org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.SnapshotDeduplicatorImpl;
+import org.apache.aurora.scheduler.storage.log.SnapshotService.Settings;
/**
* Bindings for scheduler distributed log based storage.
*/
-public class LogStorageModule extends PrivateModule {
+public class LogStorageModule extends AbstractModule {
@Parameters(separators = "=")
public static class Options {
- @Parameter(names = "-dlog_shutdown_grace_period",
- description = "Specifies the maximum time to wait for scheduled checkpoint and snapshot "
- + "actions to complete before forcibly shutting down.")
- public TimeAmount shutdownGracePeriod = new TimeAmount(2, Time.SECONDS);
-
@Parameter(names = "-dlog_snapshot_interval",
description = "Specifies the frequency at which snapshots of local storage are taken and "
+ "written to the log.")
@@ -73,34 +69,42 @@ public class LogStorageModule extends PrivateModule {
@Override
protected void configure() {
- bind(Settings.class)
- .toInstance(new Settings(options.shutdownGracePeriod, options.snapshotInterval));
-
- bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class)
- .toInstance(options.maxLogEntrySize);
- bind(LogManager.class).in(Singleton.class);
- bind(DurableStorage.class).in(Singleton.class);
-
- install(CallOrderEnforcingStorage.wrappingModule(DurableStorage.class));
- bind(LogPersistence.class).in(Singleton.class);
- bind(Persistence.class).to(LogPersistence.class);
- bind(DistributedSnapshotStore.class).to(LogPersistence.class);
- expose(Persistence.class);
- expose(Storage.class);
- expose(NonVolatileStorage.class);
- expose(DistributedSnapshotStore.class);
-
- bind(EntrySerializer.class).to(EntrySerializerImpl.class);
- // TODO(ksweeney): We don't need a cryptographic checksum here - assess performance of MD5
- // versus a faster error-detection checksum like CRC32 for large Snapshots.
- @SuppressWarnings("deprecation")
- HashFunction hashFunction = Hashing.md5();
- bind(HashFunction.class).annotatedWith(LogEntryHashFunction.class).toInstance(hashFunction);
-
- bind(SnapshotDeduplicator.class).to(SnapshotDeduplicatorImpl.class);
-
- install(new FactoryModuleBuilder()
- .implement(StreamManager.class, StreamManagerImpl.class)
- .build(StreamManagerFactory.class));
+ install(new PrivateModule() {
+ @Override
+ protected void configure() {
+ bind(Settings.class).toInstance(new Settings(options.snapshotInterval));
+
+ bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class)
+ .toInstance(options.maxLogEntrySize);
+ bind(LogManager.class).in(Singleton.class);
+ bind(DurableStorage.class).in(Singleton.class);
+
+ install(CallOrderEnforcingStorage.wrappingModule(DurableStorage.class));
+ bind(LogPersistence.class).in(Singleton.class);
+ bind(Persistence.class).to(LogPersistence.class);
+ bind(SnapshotStore.class).to(SnapshotService.class);
+ bind(SnapshotService.class).in(Singleton.class);
+ expose(SnapshotService.class);
+ expose(Persistence.class);
+ expose(Storage.class);
+ expose(NonVolatileStorage.class);
+ expose(SnapshotStore.class);
+
+ bind(EntrySerializer.class).to(EntrySerializerImpl.class);
+ // TODO(ksweeney): We don't need a cryptographic checksum here - assess performance of MD5
+ // versus a faster error-detection checksum like CRC32 for large Snapshots.
+ @SuppressWarnings("deprecation")
+ HashFunction hashFunction = Hashing.md5();
+ bind(HashFunction.class).annotatedWith(LogEntryHashFunction.class).toInstance(hashFunction);
+
+ bind(SnapshotDeduplicator.class).to(SnapshotDeduplicatorImpl.class);
+
+ install(new FactoryModuleBuilder()
+ .implement(StreamManager.class, StreamManagerImpl.class)
+ .build(StreamManagerFactory.class));
+ }
+ });
+
+ SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(SnapshotService.class);
}
}