You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/12/03 03:59:13 UTC
[4/4] aurora git commit: Extract a storage Persistence layer
Extract a storage Persistence layer
This extracts the `Log`- and `Snapshot`-specific details from `LogStorage`,
leaving `DurableStorage`. `DurableStorage` is useful as a general-purpose
`Storage` mutation observer, with `Persistence` being the minimal behavior
needed for an underlying durability layer to provide.
Reviewed at https://reviews.apache.org/r/64234/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/cea43db9
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/cea43db9
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/cea43db9
Branch: refs/heads/master
Commit: cea43db9ded1201f69a85a43fb67244c69cf5347
Parents: de8b375
Author: Bill Farner <wf...@apache.org>
Authored: Sat Dec 2 19:59:03 2017 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Sat Dec 2 19:59:03 2017 -0800
----------------------------------------------------------------------
.../apache/aurora/codec/ThriftBinaryCodec.java | 2 +-
.../aurora/scheduler/base/TaskTestUtil.java | 2 +-
.../configuration/ConfigurationManager.java | 2 +-
.../scheduler/resources/ResourceManager.java | 2 +-
.../storage/CallOrderEnforcingStorage.java | 6 -
.../storage/DistributedSnapshotStore.java | 15 +-
.../aurora/scheduler/storage/Storage.java | 10 -
.../scheduler/storage/backup/Recovery.java | 2 +-
.../storage/backup/TemporaryStorage.java | 2 +-
.../storage/durability/DurableStorage.java | 350 ++++++++
.../storage/durability/Persistence.java | 64 ++
.../storage/durability/ThriftBackfill.java | 175 ++++
.../storage/durability/TransactionRecorder.java | 122 +++
.../storage/durability/WriteAheadStorage.java | 368 ++++++++
.../scheduler/storage/log/LogPersistence.java | 257 ++++++
.../scheduler/storage/log/LogStorage.java | 576 ------------
.../scheduler/storage/log/LogStorageModule.java | 13 +-
.../storage/log/SnapshotStoreImpl.java | 1 +
.../scheduler/storage/log/StreamManager.java | 15 +-
.../storage/log/StreamManagerImpl.java | 46 +-
.../scheduler/storage/log/ThriftBackfill.java | 175 ----
.../storage/log/WriteAheadStorage.java | 369 --------
.../thrift/SchedulerThriftInterface.java | 8 +-
.../app/local/FakeNonVolatileStorage.java | 5 -
.../scheduler/app/local/LocalSchedulerMain.java | 13 +-
.../scheduler/storage/backup/RecoveryTest.java | 4 +-
.../storage/durability/DurableStorageTest.java | 781 ++++++++++++++++
.../storage/durability/ThriftBackfillTest.java | 222 +++++
.../durability/TransactionRecorderTest.java | 78 ++
.../durability/WriteAheadStorageTest.java | 166 ++++
.../scheduler/storage/log/LogManagerTest.java | 86 +-
.../scheduler/storage/log/LogStorageTest.java | 897 -------------------
.../storage/log/NonVolatileStorageTest.java | 5 +-
.../storage/log/SnapshotStoreImplIT.java | 1 +
.../storage/log/ThriftBackfillTest.java | 222 -----
.../storage/log/WriteAheadStorageTest.java | 165 ----
.../thrift/SchedulerThriftInterfaceTest.java | 8 +-
.../aurora/scheduler/thrift/ThriftIT.java | 2 +
38 files changed, 2687 insertions(+), 2550 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java b/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
index 3c12532..cdbe359 100644
--- a/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
+++ b/src/main/java/org/apache/aurora/codec/ThriftBinaryCodec.java
@@ -217,7 +217,7 @@ public final class ThriftBinaryCodec {
/**
* Thrown when serialization or deserialization failed.
*/
- public static class CodingException extends Exception {
+ public static class CodingException extends RuntimeException {
public CodingException(String message) {
super(message);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
index 5fe7b9b..e1f20f4 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/TaskTestUtil.java
@@ -47,10 +47,10 @@ import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings;
import org.apache.aurora.scheduler.configuration.executor.ExecutorConfig;
import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
import org.apache.mesos.v1.Protos;
import org.apache.mesos.v1.Protos.ExecutorID;
import org.apache.mesos.v1.Protos.ExecutorInfo;
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
index fa2f39c..f3e98f2 100644
--- a/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/configuration/ConfigurationManager.java
@@ -40,6 +40,7 @@ import org.apache.aurora.scheduler.base.UserProvidedStrings;
import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
import org.apache.aurora.scheduler.resources.ResourceManager;
import org.apache.aurora.scheduler.resources.ResourceType;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
import org.apache.aurora.scheduler.storage.entities.IConstraint;
import org.apache.aurora.scheduler.storage.entities.IContainer;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
@@ -48,7 +49,6 @@ import org.apache.aurora.scheduler.storage.entities.IResource;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.storage.entities.ITaskConstraint;
import org.apache.aurora.scheduler.storage.entities.IValueConstraint;
-import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
import static java.util.Objects.requireNonNull;
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
index f9dee22..d093753 100644
--- a/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/resources/ResourceManager.java
@@ -26,12 +26,12 @@ import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IResource;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
import org.apache.mesos.v1.Protos.Resource;
import static org.apache.aurora.scheduler.resources.ResourceType.BY_MESOS_NAME;
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
index 1b10ec5..25fd315 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
@@ -132,12 +132,6 @@ public class CallOrderEnforcingStorage implements NonVolatileStorage {
return wrapped.write(work);
}
- @Override
- public void snapshot() throws StorageException {
- checkState(State.READY);
- wrapped.snapshot();
- }
-
/**
* Creates a binding module that will wrap a storage class with {@link CallOrderEnforcingStorage},
* exposing the order-enforced storage as {@link Storage} and {@link NonVolatileStorage}.
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java b/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
index 4ddee40..0c6a955 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/DistributedSnapshotStore.java
@@ -15,18 +15,25 @@ package org.apache.aurora.scheduler.storage;
import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
/**
* A distributed snapshot store that supports persisting globally-visible snapshots.
*/
public interface DistributedSnapshotStore {
+
+ /**
+ * Clean up the underlying storage by optimizing internal data structures. Does not change
+ * externally-visible state but might not run concurrently with write operations.
+ */
+ void snapshot() throws StorageException;
+
/**
- * Writes a snapshot to the distributed storage system.
- * TODO(William Farner): Currently we're hiding some exceptions (which happen to be
- * RuntimeExceptions). Clean these up to be checked, and throw another exception type here.
+ * Identical to {@link #snapshot()}, using a custom {@link Snapshot} rather than an
+ * internally-generated one based on the current state.
*
* @param snapshot Snapshot to write.
* @throws CodingException If the snapshot could not be serialized.
*/
- void persist(Snapshot snapshot) throws CodingException;
+ void snapshotWith(Snapshot snapshot) throws CodingException;
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 7d325b6..c9ea1de 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -196,10 +196,6 @@ public interface Storage {
* Executes the unit of read-only {@code work}. The consistency model creates the possibility
* for a reader to read uncommitted state from a concurrent writer.
* <p>
- * TODO(wfarner): Update this documentation once all stores are backed by
- * {@link org.apache.aurora.scheduler.storage.db.DbStorage}, as the concurrency behavior will then
- * be dictated by the {@link org.mybatis.guice.transactional.Transactional#isolation()} used.
- * <p>
* TODO(wfarner): This method no longer needs to exist now that there is no global locking for
* reads. We could instead directly inject the individual stores where they are used, as long
* as the stores have a layer to replicate what is currently done by
@@ -253,12 +249,6 @@ public interface Storage {
void start(MutateWork.NoResult.Quiet initializationLogic) throws StorageException;
/**
- * Clean up the underlying storage by optimizing internal data structures. Does not change
- * externally-visible state but might not run concurrently with write operations.
- */
- void snapshot() throws StorageException;
-
- /**
* Prepares the underlying storage system for clean shutdown.
*/
void stop();
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
index 6cd5b2b..3a62f02 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
@@ -197,7 +197,7 @@ public interface Recovery {
void commit() {
primaryStorage.write((NoResult.Quiet) storeProvider -> {
try {
- distributedStore.persist(tempStorage.toSnapshot());
+ distributedStore.snapshotWith(tempStorage.toSnapshot());
shutDownNow.execute();
} catch (CodingException e) {
throw new IllegalStateException("Failed to encode snapshot.", e);
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
index 3000796..18296b0 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
@@ -27,9 +27,9 @@ import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.storage.SnapshotStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
-import org.apache.aurora.scheduler.storage.log.ThriftBackfill;
import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import static java.util.Objects.requireNonNull;
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java
new file mode 100644
index 0000000..85b2113
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/DurableStorage.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.common.stats.SlidingStats;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.SaveCronJob;
+import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
+import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.scheduler.base.SchedulerException;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.durability.Persistence.PersistenceException;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A storage implementation that ensures storage mutations are written to a persistence layer.
+ *
+ * <p>In the classic write-ahead log usage we'd perform mutations as follows:
+ * <ol>
+ * <li>record op</li>
+ * <li>perform op locally</li>
+ * <li>persist ops</li>
+ * </ol>
+ *
+ * <p>Writing the operation to persistences ensures we have a record of our mutation in case we
+ * should need to recover state later after a crash or on a new host (assuming the scheduler is
+ * distributed). We then apply the mutation to a local (in-memory) data structure for serving fast
+ * read requests.
+ *
+ * <p>This implementation leverages a local transaction to handle this:
+ * <ol>
+ * <li>start local transaction</li>
+ * <li>perform op locally (uncommitted!)</li>
+ * <li>write op to persistence</li>
+ * </ol>
+ *
+ * <p>If the op fails to apply to local storage we will never persist the op, and if the op
+ * fails to persist, it'll throw and abort the local storage operation as well.
+ */
+public class DurableStorage implements NonVolatileStorage {
+
+ /**
+ * A maintainer for context about open transactions. Assumes that an external entity is
+ * responsible for opening and closing transactions.
+ */
+ interface TransactionManager {
+
+ /**
+ * Checks whether there is an open transaction.
+ *
+ * @return {@code true} if there is an open transaction, {@code false} otherwise.
+ */
+ boolean hasActiveTransaction();
+
+ /**
+ * Adds an operation to the existing transaction.
+ *
+ * @param op Operation to include in the existing transaction.
+ */
+ void log(Op op);
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(DurableStorage.class);
+
+ private final Persistence persistence;
+ private final Storage writeBehindStorage;
+ private final SchedulerStore.Mutable writeBehindSchedulerStore;
+ private final CronJobStore.Mutable writeBehindJobStore;
+ private final TaskStore.Mutable writeBehindTaskStore;
+ private final QuotaStore.Mutable writeBehindQuotaStore;
+ private final AttributeStore.Mutable writeBehindAttributeStore;
+ private final JobUpdateStore.Mutable writeBehindJobUpdateStore;
+ private final ReentrantLock writeLock;
+ private final ThriftBackfill thriftBackfill;
+
+ private final WriteAheadStorage writeAheadStorage;
+
+ // TODO(wfarner): It should be possible to remove this flag now, since all call stacks when
+ // recovering are controlled at this layer (they're all calls to Mutable store implementations).
+ // The more involved change is changing SnapshotStore to accept a Mutable store provider to
+ // avoid a call to Storage.write() when we replay a Snapshot.
+ private boolean recovered = false;
+ private TransactionRecorder transaction = null;
+
+ private final SlidingStats writerWaitStats = new SlidingStats("storage_write_lock_wait", "ns");
+
+ private final Map<Op._Fields, Consumer<Op>> transactionReplayActions;
+
+ @Inject
+ DurableStorage(
+ Persistence persistence,
+ @Volatile Storage delegateStorage,
+ @Volatile SchedulerStore.Mutable schedulerStore,
+ @Volatile CronJobStore.Mutable jobStore,
+ @Volatile TaskStore.Mutable taskStore,
+ @Volatile QuotaStore.Mutable quotaStore,
+ @Volatile AttributeStore.Mutable attributeStore,
+ @Volatile JobUpdateStore.Mutable jobUpdateStore,
+ EventSink eventSink,
+ ReentrantLock writeLock,
+ ThriftBackfill thriftBackfill) {
+
+ this.persistence = requireNonNull(persistence);
+
+ // DurableStorage has two distinct operating modes: pre- and post-recovery. When recovering,
+ // we write directly to the writeBehind stores since we are replaying what's already persisted.
+ // After that, all writes must succeed in Persistence before they may be considered successful.
+ this.writeBehindStorage = requireNonNull(delegateStorage);
+ this.writeBehindSchedulerStore = requireNonNull(schedulerStore);
+ this.writeBehindJobStore = requireNonNull(jobStore);
+ this.writeBehindTaskStore = requireNonNull(taskStore);
+ this.writeBehindQuotaStore = requireNonNull(quotaStore);
+ this.writeBehindAttributeStore = requireNonNull(attributeStore);
+ this.writeBehindJobUpdateStore = requireNonNull(jobUpdateStore);
+ this.writeLock = requireNonNull(writeLock);
+ this.thriftBackfill = requireNonNull(thriftBackfill);
+ TransactionManager transactionManager = new TransactionManager() {
+ @Override
+ public boolean hasActiveTransaction() {
+ return transaction != null;
+ }
+
+ @Override
+ public void log(Op op) {
+ transaction.add(op);
+ }
+ };
+ this.writeAheadStorage = new WriteAheadStorage(
+ transactionManager,
+ schedulerStore,
+ jobStore,
+ taskStore,
+ quotaStore,
+ attributeStore,
+ jobUpdateStore,
+ LoggerFactory.getLogger(WriteAheadStorage.class),
+ eventSink);
+
+ this.transactionReplayActions = buildTransactionReplayActions();
+ }
+
+ @VisibleForTesting
+ final Map<Op._Fields, Consumer<Op>> buildTransactionReplayActions() {
+ return ImmutableMap.<Op._Fields, Consumer<Op>>builder()
+ .put(
+ Op._Fields.SAVE_FRAMEWORK_ID,
+ op -> writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId()))
+ .put(Op._Fields.SAVE_CRON_JOB, op -> {
+ SaveCronJob cronJob = op.getSaveCronJob();
+ writeBehindJobStore.saveAcceptedJob(
+ thriftBackfill.backfillJobConfiguration(cronJob.getJobConfig()));
+ })
+ .put(
+ Op._Fields.REMOVE_JOB,
+ op -> writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey())))
+ .put(
+ Op._Fields.SAVE_TASKS,
+ op -> writeBehindTaskStore.saveTasks(
+ thriftBackfill.backfillTasks(op.getSaveTasks().getTasks())))
+ .put(
+ Op._Fields.REMOVE_TASKS,
+ op -> writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds()))
+ .put(Op._Fields.SAVE_QUOTA, op -> {
+ SaveQuota saveQuota = op.getSaveQuota();
+ writeBehindQuotaStore.saveQuota(
+ saveQuota.getRole(),
+ ThriftBackfill.backfillResourceAggregate(saveQuota.getQuota()));
+ })
+ .put(
+ Op._Fields.REMOVE_QUOTA,
+ op -> writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole()))
+ .put(Op._Fields.SAVE_HOST_ATTRIBUTES, op -> {
+ HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes();
+ // Prior to commit 5cf760b, the store would persist maintenance mode changes for
+ // unknown hosts. 5cf760b began rejecting these, but the storage may still
+ // contain entries with a null slave ID.
+ if (attributes.isSetSlaveId()) {
+ writeBehindAttributeStore.saveHostAttributes(IHostAttributes.build(attributes));
+ } else {
+ LOG.info("Dropping host attributes with no agent ID: " + attributes);
+ }
+ })
+ .put(Op._Fields.SAVE_JOB_UPDATE, op ->
+ writeBehindJobUpdateStore.saveJobUpdate(
+ thriftBackfill.backFillJobUpdate(op.getSaveJobUpdate().getJobUpdate())))
+ .put(Op._Fields.SAVE_JOB_UPDATE_EVENT, op -> {
+ SaveJobUpdateEvent event = op.getSaveJobUpdateEvent();
+ writeBehindJobUpdateStore.saveJobUpdateEvent(
+ IJobUpdateKey.build(event.getKey()),
+ IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent()));
+ })
+ .put(Op._Fields.SAVE_JOB_INSTANCE_UPDATE_EVENT, op -> {
+ SaveJobInstanceUpdateEvent event = op.getSaveJobInstanceUpdateEvent();
+ writeBehindJobUpdateStore.saveJobInstanceUpdateEvent(
+ IJobUpdateKey.build(event.getKey()),
+ IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent()));
+ })
+ .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, op -> {
+ LOG.info("Dropping prune operation. Updates will be pruned later.");
+ })
+ .put(Op._Fields.REMOVE_JOB_UPDATE, op ->
+ writeBehindJobUpdateStore.removeJobUpdates(
+ IJobUpdateKey.setFromBuilders(op.getRemoveJobUpdate().getKeys())))
+ .build();
+ }
+
+ @Override
+ @Timed("scheduler_storage_prepare")
+ public synchronized void prepare() {
+ writeBehindStorage.prepare();
+ persistence.prepare();
+ }
+
+ @Override
+ @Timed("scheduler_storage_start")
+ public synchronized void start(final MutateWork.NoResult.Quiet initializationLogic) {
+ write((NoResult.Quiet) unused -> {
+ // Must have the underlying storage started so we can query it.
+ // We replay these entries in the forwarded storage system's transactions but not ours - we
+ // do not want to re-record these ops.
+ recover();
+ recovered = true;
+
+ // Now that we're recovered we should persist any mutations done in initializationLogic, so
+ // run it in one of our transactions.
+ write(initializationLogic);
+ });
+ }
+
+ @Override
+ public void stop() {
+ // No-op.
+ }
+
+ @Timed("scheduler_storage_recover")
+ void recover() throws RecoveryFailedException {
+ try {
+ persistence.recover().forEach(DurableStorage.this::replayOp);
+ } catch (PersistenceException e) {
+ throw new RecoveryFailedException(e);
+ }
+ }
+
+ private static final class RecoveryFailedException extends SchedulerException {
+ RecoveryFailedException(Throwable cause) {
+ super(cause);
+ }
+ }
+
+ private void replayOp(Op op) {
+ Op._Fields opField = op.getSetField();
+ if (!transactionReplayActions.containsKey(opField)) {
+ throw new IllegalStateException("Unknown transaction op: " + opField);
+ }
+
+ transactionReplayActions.get(opField).accept(op);
+ }
+
+ private <T, E extends Exception> T doInTransaction(final MutateWork<T, E> work)
+ throws StorageException, E {
+
+ // The transaction has already been set up so we just need to delegate with our store provider
+ // so any mutations may be persisted.
+ if (transaction != null) {
+ return work.apply(writeAheadStorage);
+ }
+
+ transaction = new TransactionRecorder();
+ try {
+ return writeBehindStorage.write(unused -> {
+ T result = work.apply(writeAheadStorage);
+ List<Op> ops = transaction.getOps();
+ if (!ops.isEmpty()) {
+ try {
+ persistence.persist(ops.stream());
+ } catch (PersistenceException e) {
+ throw new StorageException("Failed to persist storage changes", e);
+ }
+ }
+ return result;
+ });
+ } finally {
+ transaction = null;
+ }
+ }
+
+ @Override
+ public <T, E extends Exception> T write(final MutateWork<T, E> work) throws StorageException, E {
+ long waitStart = System.nanoTime();
+ writeLock.lock();
+ try {
+ writerWaitStats.accumulate(System.nanoTime() - waitStart);
+ // We don't want to persist when recovering, we just want to update the underlying
+ // store - so pass mutations straight through to the underlying storage.
+ if (!recovered) {
+ return writeBehindStorage.write(work);
+ }
+
+ return doInTransaction(work);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
+ return writeBehindStorage.read(work);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java
new file mode 100644
index 0000000..9eb862c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/Persistence.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.stream.Stream;
+
+import org.apache.aurora.gen.storage.Op;
+
+/**
+ * Persistence layer for storage operations.
+ */
+public interface Persistence {
+
+ /**
+ * Prepares the persistence layer. The implementation may use this, for example, to advertise as
+ * a replica to cohort schedulers, or begin syncing state for warm standby.
+ */
+ void prepare();
+
+ /**
+ * Recovers previously-persisted records.
+ *
+ * @return All persisted records.
+ * @throws PersistenceException If recovery failed.
+ */
+ Stream<Op> recover() throws PersistenceException;
+
+ /**
+ * Saves new records. No records may be considered durably saved until this method returns
+ * successfully.
+ *
+ * @param records Records to save.
+ * @throws PersistenceException If the records could not be saved.
+ */
+ void persist(Stream<Op> records) throws PersistenceException;
+
+ /**
+ * Thrown when a persistence operation fails.
+ */
+ class PersistenceException extends Exception {
+ public PersistenceException(String msg) {
+ super(msg);
+ }
+
+ public PersistenceException(Throwable cause) {
+ super(cause);
+ }
+
+ public PersistenceException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java
new file mode 100644
index 0000000..4425d02
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/ThriftBackfill.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.inject.Inject;
+
+import org.apache.aurora.GuavaUtils;
+import org.apache.aurora.gen.JobConfiguration;
+import org.apache.aurora.gen.JobUpdate;
+import org.apache.aurora.gen.JobUpdateInstructions;
+import org.apache.aurora.gen.Resource;
+import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.TierInfo;
+import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.quota.QuotaManager;
+import org.apache.aurora.scheduler.resources.ResourceType;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IResource;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
+import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
+
+/**
+ * Helps migrating thrift schema by populating deprecated and/or replacement fields.
+ */
+public final class ThriftBackfill {
+
+ private final TierManager tierManager;
+
+ @Inject
+ public ThriftBackfill(TierManager tierManager) {
+ this.tierManager = requireNonNull(tierManager);
+ }
+
+ private static Resource getResource(Set<Resource> resources, ResourceType type) {
+ return resources.stream()
+ .filter(e -> ResourceType.fromResource(IResource.build(e)).equals(type))
+ .findFirst()
+ .orElseThrow(() ->
+ new IllegalArgumentException("Missing resource definition for " + type));
+ }
+
+ /**
+ * Ensures TaskConfig.resources and correspondent task-level fields are all populated.
+ *
+ * @param config TaskConfig to backfill.
+ * @return Backfilled TaskConfig.
+ */
+ public TaskConfig backfillTask(TaskConfig config) {
+ backfillTier(config);
+ return config;
+ }
+
+ private void backfillTier(TaskConfig config) {
+ ITaskConfig taskConfig = ITaskConfig.build(config);
+ if (config.isSetTier()) {
+ TierInfo tier = tierManager.getTier(taskConfig);
+ config.setProduction(!tier.isPreemptible() && !tier.isRevocable());
+ } else {
+ config.setTier(tierManager.getTiers()
+ .entrySet()
+ .stream()
+ .filter(e -> e.getValue().isPreemptible() == !taskConfig.isProduction()
+ && !e.getValue().isRevocable())
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException(
+ format("No matching implicit tier for task of job %s", taskConfig.getJob())))
+ .getKey());
+ }
+ }
+
+ /**
+ * Backfills JobConfiguration. See {@link #backfillTask(TaskConfig)}.
+ *
+ * @param jobConfig JobConfiguration to backfill.
+ * @return Backfilled JobConfiguration.
+ */
+ public IJobConfiguration backfillJobConfiguration(JobConfiguration jobConfig) {
+ backfillTask(jobConfig.getTaskConfig());
+ return IJobConfiguration.build(jobConfig);
+ }
+
+ /**
+ * Backfills set of tasks. See {@link #backfillTask(TaskConfig)}.
+ *
+ * @param tasks Set of tasks to backfill.
+ * @return Backfilled set of tasks.
+ */
+ public Set<IScheduledTask> backfillTasks(Set<ScheduledTask> tasks) {
+ return tasks.stream()
+ .map(t -> backfillScheduledTask(t))
+ .map(IScheduledTask::build)
+ .collect(GuavaUtils.toImmutableSet());
+ }
+
+ /**
+ * Ensures ResourceAggregate.resources and correspondent deprecated fields are all populated.
+ *
+ * @param aggregate ResourceAggregate to backfill.
+ * @return Backfilled IResourceAggregate.
+ */
+ public static IResourceAggregate backfillResourceAggregate(ResourceAggregate aggregate) {
+ if (!aggregate.isSetResources() || aggregate.getResources().isEmpty()) {
+ aggregate.addToResources(Resource.numCpus(aggregate.getNumCpus()));
+ aggregate.addToResources(Resource.ramMb(aggregate.getRamMb()));
+ aggregate.addToResources(Resource.diskMb(aggregate.getDiskMb()));
+ } else {
+ EnumSet<ResourceType> quotaResources = QuotaManager.QUOTA_RESOURCE_TYPES;
+ if (aggregate.getResources().size() > quotaResources.size()) {
+ throw new IllegalArgumentException("Too many resource values in quota.");
+ }
+
+ if (!quotaResources.equals(aggregate.getResources().stream()
+ .map(e -> ResourceType.fromResource(IResource.build(e)))
+ .collect(Collectors.toSet()))) {
+
+ throw new IllegalArgumentException("Quota resources must be exactly: " + quotaResources);
+ }
+ aggregate.setNumCpus(
+ getResource(aggregate.getResources(), CPUS).getNumCpus());
+ aggregate.setRamMb(
+ getResource(aggregate.getResources(), RAM_MB).getRamMb());
+ aggregate.setDiskMb(
+ getResource(aggregate.getResources(), DISK_MB).getDiskMb());
+ }
+ return IResourceAggregate.build(aggregate);
+ }
+
+ private ScheduledTask backfillScheduledTask(ScheduledTask task) {
+ backfillTask(task.getAssignedTask().getTask());
+ return task;
+ }
+
+ /**
+ * Backfills JobUpdate. See {@link #backfillTask(TaskConfig)}.
+ *
+ * @param update JobUpdate to backfill.
+ * @return Backfilled job update.
+ */
+ public IJobUpdate backFillJobUpdate(JobUpdate update) {
+ JobUpdateInstructions instructions = update.getInstructions();
+ if (instructions.isSetDesiredState()) {
+ backfillTask(instructions.getDesiredState().getTask());
+ }
+
+ instructions.getInitialState().forEach(e -> backfillTask(e.getTask()));
+
+ return IJobUpdate.build(update);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorder.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorder.java
new file mode 100644
index 0000000..1c811e3
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/TransactionRecorder.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.RemoveTasks;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveTasks;
+
+/**
+ * Records a sequence of mutations to the storage.
+ */
+class TransactionRecorder {
+ private final List<Op> ops = Lists.newArrayList();
+
+ void add(Op op) {
+ Op prior = Iterables.getLast(ops, null);
+ if (prior == null || !coalesce(prior, op)) {
+ ops.add(op);
+ }
+ }
+
+ List<Op> getOps() {
+ return ops;
+ }
+
+ /**
+ * Tries to coalesce a new op into the prior to compact the binary representation and increase
+ * batching.
+ *
+ * @param prior The previous op.
+ * @param next The next op to be added.
+ * @return {@code true} if the next op was coalesced into the prior, {@code false} otherwise.
+ */
+ private boolean coalesce(Op prior, Op next) {
+ if (!prior.isSet() && !next.isSet()) {
+ return false;
+ }
+
+ Op._Fields priorType = prior.getSetField();
+ if (!priorType.equals(next.getSetField())) {
+ return false;
+ }
+
+ switch (priorType) {
+ case SAVE_FRAMEWORK_ID:
+ prior.setSaveFrameworkId(next.getSaveFrameworkId());
+ return true;
+ case SAVE_TASKS:
+ coalesce(prior.getSaveTasks(), next.getSaveTasks());
+ return true;
+ case REMOVE_TASKS:
+ coalesce(prior.getRemoveTasks(), next.getRemoveTasks());
+ return true;
+ case SAVE_HOST_ATTRIBUTES:
+ return coalesce(prior.getSaveHostAttributes(), next.getSaveHostAttributes());
+ default:
+ return false;
+ }
+ }
+
+ private void coalesce(SaveTasks prior, SaveTasks next) {
+ if (next.isSetTasks()) {
+ if (prior.isSetTasks()) {
+ // It is an expected invariant that an operation may reference a task (identified by
+ // task ID) no more than one time. Therefore, to coalesce two SaveTasks operations,
+ // the most recent task definition overrides the prior operation.
+ Map<String, ScheduledTask> coalesced = Maps.newHashMap();
+ for (ScheduledTask task : prior.getTasks()) {
+ coalesced.put(task.getAssignedTask().getTaskId(), task);
+ }
+ for (ScheduledTask task : next.getTasks()) {
+ coalesced.put(task.getAssignedTask().getTaskId(), task);
+ }
+ prior.setTasks(ImmutableSet.copyOf(coalesced.values()));
+ } else {
+ prior.setTasks(next.getTasks());
+ }
+ }
+ }
+
+ private void coalesce(RemoveTasks prior, RemoveTasks next) {
+ if (next.isSetTaskIds()) {
+ if (prior.isSetTaskIds()) {
+ prior.setTaskIds(ImmutableSet.<String>builder()
+ .addAll(prior.getTaskIds())
+ .addAll(next.getTaskIds())
+ .build());
+ } else {
+ prior.setTaskIds(next.getTaskIds());
+ }
+ }
+ }
+
+ private boolean coalesce(SaveHostAttributes prior, SaveHostAttributes next) {
+ if (prior.getHostAttributes().getHost().equals(next.getHostAttributes().getHost())) {
+ prior.getHostAttributes().setAttributes(next.getHostAttributes().getAttributes());
+ return true;
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java
new file mode 100644
index 0000000..667db06
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/durability/WriteAheadStorage.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.durability;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.RemoveJob;
+import org.apache.aurora.gen.storage.RemoveQuota;
+import org.apache.aurora.gen.storage.RemoveTasks;
+import org.apache.aurora.gen.storage.SaveCronJob;
+import org.apache.aurora.gen.storage.SaveFrameworkId;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
+import org.apache.aurora.gen.storage.SaveJobUpdate;
+import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.durability.DurableStorage.TransactionManager;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.slf4j.Logger;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Mutable stores implementation that translates all operations to {@link Op}s (which are passed
+ * to a provided {@link TransactionManager}) before forwarding the operations to delegate mutable
+ * stores.
+ */
+public class WriteAheadStorage implements
+ MutableStoreProvider,
+ SchedulerStore.Mutable,
+ CronJobStore.Mutable,
+ TaskStore.Mutable,
+ QuotaStore.Mutable,
+ AttributeStore.Mutable,
+ JobUpdateStore.Mutable {
+
+ private final TransactionManager transactionManager;
+ private final SchedulerStore.Mutable schedulerStore;
+ private final CronJobStore.Mutable jobStore;
+ private final TaskStore.Mutable taskStore;
+ private final QuotaStore.Mutable quotaStore;
+ private final AttributeStore.Mutable attributeStore;
+ private final JobUpdateStore.Mutable jobUpdateStore;
+ private final Logger log;
+ private final EventSink eventSink;
+
+ /**
+ * Creates a new write-ahead storage that delegates to the providing default stores.
+ *
+ * @param transactionManager External controller for transaction operations.
+ * @param schedulerStore Delegate.
+ * @param jobStore Delegate.
+ * @param taskStore Delegate.
+ * @param quotaStore Delegate.
+ * @param attributeStore Delegate.
+ * @param jobUpdateStore Delegate.
+ */
+ public WriteAheadStorage(
+ TransactionManager transactionManager,
+ SchedulerStore.Mutable schedulerStore,
+ CronJobStore.Mutable jobStore,
+ TaskStore.Mutable taskStore,
+ QuotaStore.Mutable quotaStore,
+ AttributeStore.Mutable attributeStore,
+ JobUpdateStore.Mutable jobUpdateStore,
+ Logger log,
+ EventSink eventSink) {
+
+ this.transactionManager = requireNonNull(transactionManager);
+ this.schedulerStore = requireNonNull(schedulerStore);
+ this.jobStore = requireNonNull(jobStore);
+ this.taskStore = requireNonNull(taskStore);
+ this.quotaStore = requireNonNull(quotaStore);
+ this.attributeStore = requireNonNull(attributeStore);
+ this.jobUpdateStore = requireNonNull(jobUpdateStore);
+ this.log = requireNonNull(log);
+ this.eventSink = requireNonNull(eventSink);
+ }
+
+ private void write(Op op) {
+ Preconditions.checkState(
+ transactionManager.hasActiveTransaction(),
+ "Mutating operations must be within a transaction.");
+ transactionManager.log(op);
+ }
+
+ @Override
+ public void saveFrameworkId(final String frameworkId) {
+ requireNonNull(frameworkId);
+
+ write(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
+ schedulerStore.saveFrameworkId(frameworkId);
+ }
+
+ @Override
+ public void deleteTasks(final Set<String> taskIds) {
+ requireNonNull(taskIds);
+
+ write(Op.removeTasks(new RemoveTasks(taskIds)));
+ taskStore.deleteTasks(taskIds);
+ }
+
+ @Override
+ public void saveTasks(final Set<IScheduledTask> newTasks) {
+ requireNonNull(newTasks);
+
+ write(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks))));
+ taskStore.saveTasks(newTasks);
+ }
+
+ @Override
+ public Optional<IScheduledTask> mutateTask(
+ String taskId,
+ Function<IScheduledTask, IScheduledTask> mutator) {
+
+ Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, mutator);
+ log.debug("Storing updated task to log: {}={}", taskId, mutated.get().getStatus());
+ write(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
+
+ return mutated;
+ }
+
+ @Override
+ public void saveQuota(final String role, final IResourceAggregate quota) {
+ requireNonNull(role);
+ requireNonNull(quota);
+
+ write(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
+ quotaStore.saveQuota(role, quota);
+ }
+
+ @Override
+ public boolean saveHostAttributes(final IHostAttributes attrs) {
+ requireNonNull(attrs);
+
+ boolean changed = attributeStore.saveHostAttributes(attrs);
+ if (changed) {
+ write(Op.saveHostAttributes(new SaveHostAttributes(attrs.newBuilder())));
+ eventSink.post(new PubsubEvent.HostAttributesChanged(attrs));
+ }
+ return changed;
+ }
+
+ @Override
+ public void removeJob(final IJobKey jobKey) {
+ requireNonNull(jobKey);
+
+ write(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder())));
+ jobStore.removeJob(jobKey);
+ }
+
+ @Override
+ public void saveAcceptedJob(final IJobConfiguration jobConfig) {
+ requireNonNull(jobConfig);
+
+ write(Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder())));
+ jobStore.saveAcceptedJob(jobConfig);
+ }
+
+ @Override
+ public void removeQuota(final String role) {
+ requireNonNull(role);
+
+ write(Op.removeQuota(new RemoveQuota(role)));
+ quotaStore.removeQuota(role);
+ }
+
+ @Override
+ public void saveJobUpdate(IJobUpdate update) {
+ requireNonNull(update);
+
+ write(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder())));
+ jobUpdateStore.saveJobUpdate(update);
+ }
+
+ @Override
+ public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) {
+ requireNonNull(key);
+ requireNonNull(event);
+
+ write(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(event.newBuilder(), key.newBuilder())));
+ jobUpdateStore.saveJobUpdateEvent(key, event);
+ }
+
+ @Override
+ public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) {
+ requireNonNull(key);
+ requireNonNull(event);
+
+ write(Op.saveJobInstanceUpdateEvent(
+ new SaveJobInstanceUpdateEvent(event.newBuilder(), key.newBuilder())));
+ jobUpdateStore.saveJobInstanceUpdateEvent(key, event);
+ }
+
+ @Override
+ public void removeJobUpdates(Set<IJobUpdateKey> keys) {
+ requireNonNull(keys);
+
+ // Compatibility mode - RemoveJobUpdates is not yet written since older versions cannot
+ // read it. JobUpdates are only removed implicitly when a snapshot is taken.
+ jobUpdateStore.removeJobUpdates(keys);
+ }
+
+ @Override
+ public void deleteAllTasks() {
+ throw new UnsupportedOperationException(
+ "Unsupported since casual storage users should never be doing this.");
+ }
+
+ @Override
+ public void deleteHostAttributes() {
+ throw new UnsupportedOperationException(
+ "Unsupported since casual storage users should never be doing this.");
+ }
+
+ @Override
+ public void deleteJobs() {
+ throw new UnsupportedOperationException(
+ "Unsupported since casual storage users should never be doing this.");
+ }
+
+ @Override
+ public void deleteQuotas() {
+ throw new UnsupportedOperationException(
+ "Unsupported since casual storage users should never be doing this.");
+ }
+
+ @Override
+ public void deleteAllUpdates() {
+ throw new UnsupportedOperationException(
+ "Unsupported since casual storage users should never be doing this.");
+ }
+
+ @Override
+ public SchedulerStore.Mutable getSchedulerStore() {
+ return this;
+ }
+
+ @Override
+ public CronJobStore.Mutable getCronJobStore() {
+ return this;
+ }
+
+ @Override
+ public TaskStore.Mutable getUnsafeTaskStore() {
+ return this;
+ }
+
+ @Override
+ public QuotaStore.Mutable getQuotaStore() {
+ return this;
+ }
+
+ @Override
+ public AttributeStore.Mutable getAttributeStore() {
+ return this;
+ }
+
+ @Override
+ public TaskStore getTaskStore() {
+ return this;
+ }
+
+ @Override
+ public JobUpdateStore.Mutable getJobUpdateStore() {
+ return this;
+ }
+
+ @Override
+ public Optional<String> fetchFrameworkId() {
+ return this.schedulerStore.fetchFrameworkId();
+ }
+
+ @Override
+ public Iterable<IJobConfiguration> fetchJobs() {
+ return this.jobStore.fetchJobs();
+ }
+
+ @Override
+ public Optional<IJobConfiguration> fetchJob(IJobKey jobKey) {
+ return this.jobStore.fetchJob(jobKey);
+ }
+
+ @Override
+ public Optional<IScheduledTask> fetchTask(String taskId) {
+ return this.taskStore.fetchTask(taskId);
+ }
+
+ @Override
+ public Iterable<IScheduledTask> fetchTasks(Query.Builder query) {
+ return this.taskStore.fetchTasks(query);
+ }
+
+ @Override
+ public Set<IJobKey> getJobKeys() {
+ return this.taskStore.getJobKeys();
+ }
+
+ @Override
+ public Optional<IResourceAggregate> fetchQuota(String role) {
+ return this.quotaStore.fetchQuota(role);
+ }
+
+ @Override
+ public Map<String, IResourceAggregate> fetchQuotas() {
+ return this.quotaStore.fetchQuotas();
+ }
+
+ @Override
+ public Optional<IHostAttributes> getHostAttributes(String host) {
+ return this.attributeStore.getHostAttributes(host);
+ }
+
+ @Override
+ public Set<IHostAttributes> getHostAttributes() {
+ return this.attributeStore.getHostAttributes();
+ }
+
+ @Override
+ public List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) {
+ return this.jobUpdateStore.fetchJobUpdates(query);
+ }
+
+ @Override
+ public Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) {
+ return this.jobUpdateStore.fetchJobUpdate(key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
new file mode 100644
index 0000000..a0a6b6c
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
+import org.apache.aurora.common.application.ShutdownRegistry;
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.gen.storage.LogEntry;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException;
+import org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
+import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.apache.aurora.scheduler.storage.durability.Persistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Persistence layer that uses a replicated log.
+ */
+class LogPersistence implements Persistence, DistributedSnapshotStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LogPersistence.class);
+
+ private final LogManager logManager;
+ private final SnapshotStore<Snapshot> snapshotStore;
+ private final SchedulingService schedulingService;
+ private final Amount<Long, Time> snapshotInterval;
+ private StreamManager streamManager;
+
+ @Inject
+ LogPersistence(
+ Settings settings,
+ LogManager logManager,
+ SnapshotStore<Snapshot> snapshotStore,
+ ShutdownRegistry shutdownRegistry) {
+
+ this(new ScheduledExecutorSchedulingService(
+ shutdownRegistry,
+ settings.getShutdownGracePeriod()),
+ settings.getSnapshotInterval(),
+ logManager,
+ snapshotStore);
+ }
+
+ @VisibleForTesting
+ LogPersistence(
+ SchedulingService schedulingService,
+ Amount<Long, Time> snapshotInterval,
+ LogManager logManager,
+ SnapshotStore<Snapshot> snapshotStore) {
+
+ this.schedulingService = requireNonNull(schedulingService);
+ this.snapshotInterval = requireNonNull(snapshotInterval);
+ this.logManager = requireNonNull(logManager);
+ this.snapshotStore = requireNonNull(snapshotStore);
+ }
+
+ @Override
+ public void prepare() {
+ // Open the log to make a log replica available to the scheduler group.
+ try {
+ streamManager = logManager.open();
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to open the log, cannot continue", e);
+ }
+ }
+
+ @Override
+ public void persist(Stream<Op> mutations) throws PersistenceException {
+ StreamTransaction transaction = streamManager.startTransaction();
+ mutations.forEach(transaction::add);
+ try {
+ transaction.commit();
+ } catch (CodingException e) {
+ throw new PersistenceException(e);
+ }
+ }
+
+ @Override
+ public Stream<Op> recover() throws PersistenceException {
+ scheduleSnapshots();
+
+ try {
+ Iterator<LogEntry> entries = streamManager.readFromBeginning();
+ Iterable<LogEntry> iterableEntries = () -> entries;
+ Stream<LogEntry> entryStream = StreamSupport.stream(iterableEntries.spliterator(), false);
+
+ return entryStream
+ .filter(entry -> entry.getSetField() != LogEntry._Fields.NOOP)
+ .filter(entry -> {
+ if (entry.getSetField() == LogEntry._Fields.SNAPSHOT) {
+ Snapshot snapshot = entry.getSnapshot();
+ LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp()));
+ snapshotStore.applySnapshot(snapshot);
+ return false;
+ }
+ return true;
+ })
+ .peek(entry -> {
+ if (entry.getSetField() != LogEntry._Fields.TRANSACTION) {
+ throw new IllegalStateException("Unknown log entry type: " + entry.getSetField());
+ }
+ })
+ .flatMap(entry -> entry.getTransaction().getOps().stream());
+ } catch (CodingException | InvalidPositionException | StreamAccessException e) {
+ throw new PersistenceException(e);
+ }
+ }
+
+ private void scheduleSnapshots() {
+ if (snapshotInterval.getValue() > 0) {
+ schedulingService.doEvery(snapshotInterval, () -> {
+ try {
+ snapshot();
+ } catch (StorageException e) {
+ if (e.getCause() == null) {
+ LOG.warn("StorageException when attempting to snapshot.", e);
+ } else {
+ LOG.warn(e.getMessage(), e.getCause());
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ public void snapshot() throws StorageException {
+ try {
+ doSnapshot();
+ } catch (CodingException e) {
+ throw new StorageException("Failed to encode a snapshot", e);
+ } catch (InvalidPositionException e) {
+ throw new StorageException("Saved snapshot but failed to truncate entries preceding it", e);
+ } catch (StreamAccessException e) {
+ throw new StorageException("Failed to create a snapshot", e);
+ }
+ }
+
+ @Timed("scheduler_log_snapshot_persist")
+ @Override
+ public void snapshotWith(Snapshot snapshot)
+ throws CodingException, InvalidPositionException, StreamAccessException {
+
+ streamManager.snapshot(snapshot);
+ }
+
+ /**
+ * Forces a snapshot of the storage state.
+ *
+ * @throws CodingException If there is a problem encoding the snapshot.
+ * @throws InvalidPositionException If the log stream cursor is invalid.
+ * @throws StreamAccessException If there is a problem writing the snapshot to the log stream.
+ */
+ @Timed("scheduler_log_snapshot")
+ void doSnapshot() throws CodingException, InvalidPositionException, StreamAccessException {
+ LOG.info("Creating snapshot.");
+ Snapshot snapshot = snapshotStore.createSnapshot();
+ snapshotWith(snapshot);
+ LOG.info("Snapshot complete."
+ + " host attrs: " + snapshot.getHostAttributesSize()
+ + ", cron jobs: " + snapshot.getCronJobsSize()
+ + ", quota confs: " + snapshot.getQuotaConfigurationsSize()
+ + ", tasks: " + snapshot.getTasksSize()
+ + ", updates: " + snapshot.getJobUpdateDetailsSize());
+ }
+
+ /**
+ * A service that can schedule an action to be executed periodically.
+ */
+ @VisibleForTesting
+ interface SchedulingService {
+
+ /**
+ * Schedules an action to execute periodically.
+ *
+ * @param interval The time period to wait until running the {@code action} again.
+ * @param action The action to execute periodically.
+ */
+ void doEvery(Amount<Long, Time> interval, Runnable action);
+ }
+
+ private static class ScheduledExecutorSchedulingService implements SchedulingService {
+ private final ScheduledExecutorService scheduledExecutor;
+
+ ScheduledExecutorSchedulingService(ShutdownRegistry shutdownRegistry,
+ Amount<Long, Time> shutdownGracePeriod) {
+ scheduledExecutor = AsyncUtil.singleThreadLoggingScheduledExecutor("LogStorage-%d", LOG);
+ shutdownRegistry.addAction(() -> MoreExecutors.shutdownAndAwaitTermination(
+ scheduledExecutor,
+ shutdownGracePeriod.getValue(),
+ shutdownGracePeriod.getUnit().getTimeUnit()));
+ }
+
+ @Override
+ public void doEvery(Amount<Long, Time> interval, Runnable action) {
+ requireNonNull(interval);
+ requireNonNull(action);
+
+ long delay = interval.getValue();
+ TimeUnit timeUnit = interval.getUnit().getTimeUnit();
+ scheduledExecutor.scheduleWithFixedDelay(action, delay, delay, timeUnit);
+ }
+ }
+
+ /**
+ * Configuration settings for log persistence.
+ */
+ public static class Settings {
+ private final Amount<Long, Time> shutdownGracePeriod;
+ private final Amount<Long, Time> snapshotInterval;
+
+ Settings(Amount<Long, Time> shutdownGracePeriod, Amount<Long, Time> snapshotInterval) {
+ this.shutdownGracePeriod = requireNonNull(shutdownGracePeriod);
+ this.snapshotInterval = requireNonNull(snapshotInterval);
+ }
+
+ public Amount<Long, Time> getShutdownGracePeriod() {
+ return shutdownGracePeriod;
+ }
+
+ public Amount<Long, Time> getSnapshotInterval() {
+ return snapshotInterval;
+ }
+ }
+}