You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/03/15 00:25:08 UTC
git commit: Refactor LogStorage to more cleanly separate replay and
write-behind modes, and require explicit implementation of mutate operations.
Repository: incubator-aurora
Updated Branches:
refs/heads/master 5d4e041fd -> 1713d0824
Refactor LogStorage to more cleanly separate replay and write-behind modes, and
require explicit implementation of mutate operations.
There are a few nice things falling out of this refactor:
- New methods on mutable store interfaces are not implicitly forwarded
(ForwardingStore does not implement mutable stores).
- Write ahead/behind behavior is more obvious in LogStorage
(i found the delegation by calling super tough to catch mistakes.)
- Callers with a handle on LogStorage don't have a means to invoke mutate calls
outside of a transaction (they only get access to mutable stores in write(),
which obviated testMutateRequiresWriteOperation)
Reviewed at https://reviews.apache.org/r/18487/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/1713d082
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/1713d082
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/1713d082
Branch: refs/heads/master
Commit: 1713d082405e0f5adbc6872fbf45e5a5dda3996d
Parents: 5d4e041
Author: Bill Farner <wf...@apache.org>
Authored: Fri Mar 14 16:24:45 2014 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Fri Mar 14 16:24:45 2014 -0700
----------------------------------------------------------------------
.../scheduler/storage/ForwardingStore.java | 157 +--------
.../aurora/scheduler/storage/TaskStore.java | 2 +
.../scheduler/storage/log/LogStorage.java | 333 +++++-------------
.../storage/log/WriteAheadStorage.java | 343 +++++++++++++++++++
.../scheduler/storage/log/LogStorageTest.java | 14 +-
5 files changed, 457 insertions(+), 392 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1713d082/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
index 8758eb7..a67f9d8 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/ForwardingStore.java
@@ -20,12 +20,10 @@ import java.util.Set;
import javax.annotation.Nullable;
-import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
@@ -33,7 +31,6 @@ import org.apache.aurora.scheduler.storage.entities.ILock;
import org.apache.aurora.scheduler.storage.entities.ILockKey;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -42,26 +39,23 @@ import static com.google.common.base.Preconditions.checkNotNull;
* an existing storage system.
*/
public class ForwardingStore implements
- Storage,
- SchedulerStore.Mutable,
- JobStore.Mutable,
- TaskStore.Mutable,
- LockStore.Mutable,
- QuotaStore.Mutable,
- AttributeStore.Mutable {
-
- private final Storage storage;
- private final SchedulerStore.Mutable schedulerStore;
- private final JobStore.Mutable jobStore;
- private final TaskStore.Mutable taskStore;
- private final LockStore.Mutable lockStore;
- private final QuotaStore.Mutable quotaStore;
- private final AttributeStore.Mutable attributeStore;
+ SchedulerStore,
+ JobStore,
+ TaskStore,
+ LockStore,
+ QuotaStore,
+ AttributeStore {
+
+ private final SchedulerStore schedulerStore;
+ private final JobStore jobStore;
+ private final TaskStore taskStore;
+ private final LockStore lockStore;
+ private final QuotaStore quotaStore;
+ private final AttributeStore attributeStore;
/**
* Creats a new forwarding store that delegates to the providing default stores.
*
- * @param storage Delegate.
* @param schedulerStore Delegate.
* @param jobStore Delegate.
* @param taskStore Delegate.
@@ -70,15 +64,13 @@ public class ForwardingStore implements
* @param attributeStore Delegate.
*/
public ForwardingStore(
- Storage storage,
- SchedulerStore.Mutable schedulerStore,
- JobStore.Mutable jobStore,
- TaskStore.Mutable taskStore,
- LockStore.Mutable lockStore,
- QuotaStore.Mutable quotaStore,
- AttributeStore.Mutable attributeStore) {
+ SchedulerStore schedulerStore,
+ JobStore jobStore,
+ TaskStore taskStore,
+ LockStore lockStore,
+ QuotaStore quotaStore,
+ AttributeStore attributeStore) {
- this.storage = checkNotNull(storage);
this.schedulerStore = checkNotNull(schedulerStore);
this.jobStore = checkNotNull(jobStore);
this.taskStore = checkNotNull(taskStore);
@@ -88,29 +80,6 @@ public class ForwardingStore implements
}
@Override
- public <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
- return storage.consistentRead(work);
- }
-
- @Override
- public <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work)
- throws StorageException, E {
- return storage.weaklyConsistentRead(work);
- }
-
- @Override
- public <T, E extends Exception> T write(MutateWork<T, E> work)
- throws StorageException, E {
-
- return storage.write(work);
- }
-
- @Override
- public void saveFrameworkId(String frameworkId) {
- schedulerStore.saveFrameworkId(frameworkId);
- }
-
- @Override
@Nullable
public String fetchFrameworkId() {
return schedulerStore.fetchFrameworkId();
@@ -127,54 +96,11 @@ public class ForwardingStore implements
}
@Override
- public void saveAcceptedJob(String managerId, IJobConfiguration jobConfig) {
- jobStore.saveAcceptedJob(managerId, jobConfig);
- }
-
- @Override
- public void removeJob(IJobKey jobKey) {
- jobStore.removeJob(jobKey);
- }
-
- @Override
- public void deleteJobs() {
- jobStore.deleteJobs();
- }
-
- @Override
public Set<String> fetchManagerIds() {
return jobStore.fetchManagerIds();
}
@Override
- public void saveTasks(Set<IScheduledTask> tasks) throws IllegalStateException {
- taskStore.saveTasks(tasks);
- }
-
- @Override
- public void deleteAllTasks() {
- taskStore.deleteAllTasks();
- }
-
- @Override
- public void deleteTasks(Set<String> taskIds) {
- taskStore.deleteTasks(taskIds);
- }
-
- @Override
- public ImmutableSet<IScheduledTask> mutateTasks(
- Query.Builder query,
- Function<IScheduledTask, IScheduledTask> mutator) {
-
- return taskStore.mutateTasks(query, mutator);
- }
-
- @Override
- public boolean unsafeModifyInPlace(String taskId, ITaskConfig taskConfiguration) {
- return taskStore.unsafeModifyInPlace(taskId, taskConfiguration);
- }
-
- @Override
public ImmutableSet<IScheduledTask> fetchTasks(Query.Builder querySupplier) {
return taskStore.fetchTasks(querySupplier);
}
@@ -190,51 +116,16 @@ public class ForwardingStore implements
}
@Override
- public void saveLock(ILock lock) {
- lockStore.saveLock(lock);
- }
-
- @Override
- public void removeLock(ILockKey lockKey) {
- lockStore.removeLock(lockKey);
- }
-
- @Override
- public void deleteLocks() {
- lockStore.deleteLocks();
- }
-
- @Override
public Map<String, IResourceAggregate> fetchQuotas() {
return quotaStore.fetchQuotas();
}
@Override
- public void removeQuota(String role) {
- quotaStore.removeQuota(role);
- }
-
- @Override
- public void deleteQuotas() {
- quotaStore.deleteQuotas();
- }
-
- @Override
- public void saveQuota(String role, IResourceAggregate quota) {
- quotaStore.saveQuota(role, quota);
- }
-
- @Override
public Optional<IResourceAggregate> fetchQuota(String role) {
return quotaStore.fetchQuota(role);
}
@Override
- public void saveHostAttributes(HostAttributes hostAttribute) {
- attributeStore.saveHostAttributes(hostAttribute);
- }
-
- @Override
public Optional<HostAttributes> getHostAttributes(String host) {
return attributeStore.getHostAttributes(host);
}
@@ -243,14 +134,4 @@ public class ForwardingStore implements
public Set<HostAttributes> getHostAttributes() {
return attributeStore.getHostAttributes();
}
-
- @Override
- public void deleteHostAttributes() {
- attributeStore.deleteHostAttributes();
- }
-
- @Override
- public boolean setMaintenanceMode(String host, MaintenanceMode mode) {
- return attributeStore.setMaintenanceMode(host, mode);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1713d082/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
index 3d0ff2d..4dcc5ae 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
@@ -58,6 +58,8 @@ public interface TaskStore {
/**
* Removes all tasks from the store.
+ * TODO(wfarner): Move this and other mass-delete methods to an interface that is only
+ * accessible by SnapshotStoreImpl.
*/
void deleteAllTasks();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1713d082/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index 3ccf800..a1a0b42 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -21,8 +21,6 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.Date;
-import java.util.Map;
-import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -32,12 +30,6 @@ import java.util.logging.Logger;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
import com.google.inject.BindingAnnotation;
import com.twitter.common.application.ShutdownRegistry;
import com.twitter.common.base.Closure;
@@ -47,30 +39,17 @@ import com.twitter.common.quantity.Time;
import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.storage.LogEntry;
import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.RemoveJob;
-import org.apache.aurora.gen.storage.RemoveLock;
-import org.apache.aurora.gen.storage.RemoveQuota;
-import org.apache.aurora.gen.storage.RemoveTasks;
import org.apache.aurora.gen.storage.RewriteTask;
import org.apache.aurora.gen.storage.SaveAcceptedJob;
-import org.apache.aurora.gen.storage.SaveFrameworkId;
-import org.apache.aurora.gen.storage.SaveHostAttributes;
-import org.apache.aurora.gen.storage.SaveLock;
import org.apache.aurora.gen.storage.SaveQuota;
-import org.apache.aurora.gen.storage.SaveTasks;
import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException;
import org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
-import org.apache.aurora.scheduler.storage.ForwardingStore;
import org.apache.aurora.scheduler.storage.JobStore;
import org.apache.aurora.scheduler.storage.LockStore;
import org.apache.aurora.scheduler.storage.QuotaStore;
@@ -127,8 +106,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
* <p>If the op fails to apply to local storage we will never write the op to the log and if the op
* fails to apply to the log, it'll throw and abort the local storage transaction as well.
*/
-public class LogStorage extends ForwardingStore
- implements NonVolatileStorage, DistributedSnapshotStore {
+public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore {
/**
* A service that can schedule an action to be executed periodically.
@@ -145,6 +123,27 @@ public class LogStorage extends ForwardingStore
void doEvery(Amount<Long, Time> interval, Runnable action);
}
+ /**
+ * A maintainer for context about open transactions. Assumes that an external entity is
+ * responsible for opening and closing transactions.
+ */
+ interface TransactionManager {
+
+ /**
+ * Checks whether there is an open transaction.
+ *
+ * @return {@code true} if there is an open transaction, {@code false} otherwise.
+ */
+ boolean hasActiveTransaction();
+
+ /**
+ * Adds an operation to the existing transaction.
+ *
+ * @param op Operation to include in the existing transaction.
+ */
+ void log(Op op);
+ }
+
private static class ScheduledExecutorSchedulingService implements SchedulingService {
private final ScheduledExecutorService scheduledExecutor;
@@ -172,49 +171,24 @@ public class LogStorage extends ForwardingStore
private final SchedulingService schedulingService;
private final SnapshotStore<Snapshot> snapshotStore;
private final Amount<Long, Time> snapshotInterval;
+ private final Storage writeBehindStorage;
+ private final SchedulerStore.Mutable writeBehindSchedulerStore;
+ private final JobStore.Mutable writeBehindJobStore;
+ private final TaskStore.Mutable writeBehindTaskStore;
+ private final LockStore.Mutable writeBehindLockStore;
+ private final QuotaStore.Mutable writeBehindQuotaStore;
+ private final AttributeStore.Mutable writeBehindAttributeStore;
private StreamManager streamManager;
+ private final WriteAheadStorage writeAheadStorage;
+ // TODO(wfarner): It should be possible to remove this flag now, since all call stacks when
+ // recovering are controlled at this layer (they're all calls to Mutable store implementations).
+ // The more involved change is changing SnapshotStore to accept a Mutable store provider to
+ // avoid a call to Storage.write() when we replay a Snapshot.
private boolean recovered = false;
private StreamTransaction transaction = null;
- private final MutableStoreProvider logStoreProvider = new MutableStoreProvider() {
- @Override
- public SchedulerStore.Mutable getSchedulerStore() {
- return LogStorage.this;
- }
-
- @Override
- public JobStore.Mutable getJobStore() {
- return LogStorage.this;
- }
-
- @Override
- public TaskStore getTaskStore() {
- return LogStorage.this;
- }
-
- @Override
- public TaskStore.Mutable getUnsafeTaskStore() {
- return LogStorage.this;
- }
-
- @Override
- public LockStore.Mutable getLockStore() {
- return LogStorage.this;
- }
-
- @Override
- public QuotaStore.Mutable getQuotaStore() {
- return LogStorage.this;
- }
-
- @Override
- public AttributeStore.Mutable getAttributeStore() {
- return LogStorage.this;
- }
- };
-
/**
* Identifies the grace period to give in-process snapshots and checkpoints to complete during
* shutdown.
@@ -273,7 +247,7 @@ public class LogStorage extends ForwardingStore
SchedulingService schedulingService,
SnapshotStore<Snapshot> snapshotStore,
Amount<Long, Time> snapshotInterval,
- Storage storage,
+ Storage delegateStorage,
SchedulerStore.Mutable schedulerStore,
JobStore.Mutable jobStore,
TaskStore.Mutable taskStore,
@@ -281,11 +255,41 @@ public class LogStorage extends ForwardingStore
QuotaStore.Mutable quotaStore,
AttributeStore.Mutable attributeStore) {
- super(storage, schedulerStore, jobStore, taskStore, lockStore, quotaStore, attributeStore);
this.logManager = checkNotNull(logManager);
this.schedulingService = checkNotNull(schedulingService);
this.snapshotStore = checkNotNull(snapshotStore);
this.snapshotInterval = checkNotNull(snapshotInterval);
+
+ // Log storage has two distinct operating modes: pre- and post-recovery. When recovering,
+ // we write directly to the writeBehind stores since we are replaying what's already persisted.
+ // After that, all writes must succeed in the distributed log before they may be considered
+ // successful.
+ this.writeBehindStorage = checkNotNull(delegateStorage);
+ this.writeBehindSchedulerStore = checkNotNull(schedulerStore);
+ this.writeBehindJobStore = checkNotNull(jobStore);
+ this.writeBehindTaskStore = checkNotNull(taskStore);
+ this.writeBehindLockStore = checkNotNull(lockStore);
+ this.writeBehindQuotaStore = checkNotNull(quotaStore);
+ this.writeBehindAttributeStore = checkNotNull(attributeStore);
+ TransactionManager transactionManager = new TransactionManager() {
+ @Override
+ public boolean hasActiveTransaction() {
+ return transaction != null;
+ }
+
+ @Override
+ public void log(Op op) {
+ transaction.add(op);
+ }
+ };
+ this.writeAheadStorage = new WriteAheadStorage(
+ transactionManager,
+ schedulerStore,
+ jobStore,
+ taskStore,
+ lockStore,
+ quotaStore,
+ attributeStore);
}
@Override
@@ -383,52 +387,57 @@ public class LogStorage extends ForwardingStore
private void replayOp(Op op) {
switch (op.getSetField()) {
case SAVE_FRAMEWORK_ID:
- saveFrameworkId(op.getSaveFrameworkId().getId());
+ writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId());
break;
case SAVE_ACCEPTED_JOB:
SaveAcceptedJob acceptedJob = op.getSaveAcceptedJob();
- saveAcceptedJob(
+ writeBehindJobStore.saveAcceptedJob(
acceptedJob.getManagerId(),
IJobConfiguration.build(acceptedJob.getJobConfig()));
break;
case REMOVE_JOB:
- removeJob(IJobKey.build(op.getRemoveJob().getJobKey()));
+ writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey()));
break;
case SAVE_TASKS:
- saveTasks(IScheduledTask.setFromBuilders(op.getSaveTasks().getTasks()));
+ writeBehindTaskStore.saveTasks(
+ IScheduledTask.setFromBuilders(op.getSaveTasks().getTasks()));
break;
case REWRITE_TASK:
RewriteTask rewriteTask = op.getRewriteTask();
- unsafeModifyInPlace(rewriteTask.getTaskId(), ITaskConfig.build(rewriteTask.getTask()));
+ writeBehindTaskStore.unsafeModifyInPlace(
+ rewriteTask.getTaskId(),
+ ITaskConfig.build(rewriteTask.getTask()));
break;
case REMOVE_TASKS:
- deleteTasks(op.getRemoveTasks().getTaskIds());
+ writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds());
break;
case SAVE_QUOTA:
SaveQuota saveQuota = op.getSaveQuota();
- saveQuota(saveQuota.getRole(), IResourceAggregate.build(saveQuota.getQuota()));
+ writeBehindQuotaStore.saveQuota(
+ saveQuota.getRole(),
+ IResourceAggregate.build(saveQuota.getQuota()));
break;
case REMOVE_QUOTA:
- removeQuota(op.getRemoveQuota().getRole());
+ writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole());
break;
case SAVE_HOST_ATTRIBUTES:
- saveHostAttributes(op.getSaveHostAttributes().hostAttributes);
+ writeBehindAttributeStore.saveHostAttributes(op.getSaveHostAttributes().hostAttributes);
break;
case SAVE_LOCK:
- saveLock(ILock.build(op.getSaveLock().getLock()));
+ writeBehindLockStore.saveLock(ILock.build(op.getSaveLock().getLock()));
break;
case REMOVE_LOCK:
- removeLock(ILockKey.build(op.getRemoveLock().getLockKey()));
+ writeBehindLockStore.removeLock(ILockKey.build(op.getRemoveLock().getLockKey()));
break;
default:
@@ -464,7 +473,7 @@ public class LogStorage extends ForwardingStore
*/
@Timed("scheduler_log_snapshot")
void doSnapshot() throws CodingException, InvalidPositionException, StreamAccessException {
- super.write(new MutateWork.NoResult<CodingException>() {
+ write(new MutateWork.NoResult<CodingException>() {
@Override
protected void execute(MutableStoreProvider unused)
throws CodingException, InvalidPositionException, StreamAccessException {
@@ -489,21 +498,21 @@ public class LogStorage extends ForwardingStore
// We don't want to use the log when recovering from it, we just want to update the underlying
// store - so pass mutations straight through to the underlying storage.
if (!recovered) {
- return super.write(work);
+ return writeBehindStorage.write(work);
}
// The log stream transaction has already been set up so we just need to delegate with our
// store provider so any mutations performed by work get logged.
if (transaction != null) {
- return work.apply(logStoreProvider);
+ return work.apply(writeAheadStorage);
}
transaction = streamManager.startTransaction();
try {
- return super.write(new MutateWork<T, E>() {
+ return writeBehindStorage.write(new MutateWork<T, E>() {
@Override
public T apply(MutableStoreProvider unused) throws E {
- T result = work.apply(logStoreProvider);
+ T result = work.apply(writeAheadStorage);
try {
transaction.commit();
} catch (CodingException e) {
@@ -521,164 +530,16 @@ public class LogStorage extends ForwardingStore
}
}
- @Timed("scheduler_log_save_framework_id")
@Override
- public void saveFrameworkId(final String frameworkId) {
- checkNotNull(frameworkId);
-
- log(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
- super.saveFrameworkId(frameworkId);
+ public <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
+ return writeBehindStorage.consistentRead(work);
}
- @Timed("scheduler_log_job_save")
@Override
- public void saveAcceptedJob(final String managerId, final IJobConfiguration jobConfig) {
- checkNotNull(managerId);
- checkNotNull(jobConfig);
-
- log(Op.saveAcceptedJob(new SaveAcceptedJob(managerId, jobConfig.newBuilder())));
- super.saveAcceptedJob(managerId, jobConfig);
- }
-
- @Timed("scheduler_log_job_remove")
- @Override
- public void removeJob(final IJobKey jobKey) {
- checkNotNull(jobKey);
-
- log(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder())));
- super.removeJob(jobKey);
- }
-
- @Timed("scheduler_log_tasks_save")
- @Override
- public void saveTasks(final Set<IScheduledTask> newTasks) throws IllegalStateException {
- checkNotNull(newTasks);
-
- log(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks))));
- super.saveTasks(newTasks);
- }
-
- @Override
- public void deleteAllTasks() {
- Query.Builder query = Query.unscoped();
- Set<String> ids = FluentIterable.from(fetchTasks(query))
- .transform(Tasks.SCHEDULED_TO_ID)
- .toSet();
- deleteTasks(ids);
- }
-
- @Timed("scheduler_log_tasks_remove")
- @Override
- public void deleteTasks(final Set<String> taskIds) {
- checkNotNull(taskIds);
-
- log(Op.removeTasks(new RemoveTasks(taskIds)));
- super.deleteTasks(taskIds);
- }
-
- @Timed("scheduler_log_tasks_mutate")
- @Override
- public ImmutableSet<IScheduledTask> mutateTasks(
- final Query.Builder query,
- final Function<IScheduledTask, IScheduledTask> mutator) {
-
- checkNotNull(query);
- checkNotNull(mutator);
-
- ImmutableSet<IScheduledTask> mutated = super.mutateTasks(query, mutator);
-
- Map<String, IScheduledTask> tasksById = Tasks.mapById(mutated);
- if (LOG.isLoggable(Level.FINE)) {
- LOG.fine("Storing updated tasks to log: "
- + Maps.transformValues(tasksById, Tasks.GET_STATUS));
- }
-
- // TODO(William Farner): Avoid writing an op when mutated is empty.
- log(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(mutated))));
- return mutated;
- }
-
- @Timed("scheduler_log_unsafe_modify_in_place")
- @Override
- public boolean unsafeModifyInPlace(final String taskId, final ITaskConfig taskConfiguration) {
- checkNotNull(taskId);
- checkNotNull(taskConfiguration);
-
- boolean mutated = super.unsafeModifyInPlace(taskId, taskConfiguration);
- if (mutated) {
- log(Op.rewriteTask(new RewriteTask(taskId, taskConfiguration.newBuilder())));
- }
- return mutated;
- }
-
- @Timed("scheduler_log_quota_remove")
- @Override
- public void removeQuota(final String role) {
- checkNotNull(role);
-
- log(Op.removeQuota(new RemoveQuota(role)));
- super.removeQuota(role);
- }
-
- @Timed("scheduler_log_quota_save")
- @Override
- public void saveQuota(final String role, final IResourceAggregate quota) {
- checkNotNull(role);
- checkNotNull(quota);
-
- log(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
- super.saveQuota(role, quota);
- }
-
- @Timed("scheduler_save_host_attribute")
- @Override
- public void saveHostAttributes(final HostAttributes attrs) {
- checkNotNull(attrs);
-
- // Pass the updated attributes upstream, and then check if the stored value changes.
- // We do this since different parts of the system write partial HostAttributes objects
- // and they are merged together internally.
- // TODO(William Farner): Split out a separate method
- // saveAttributes(String host, Iterable<Attributes>) to simplify this.
- Optional<HostAttributes> saved = getHostAttributes(attrs.getHost());
- super.saveHostAttributes(attrs);
- Optional<HostAttributes> updated = getHostAttributes(attrs.getHost());
- if (!saved.equals(updated)) {
- log(Op.saveHostAttributes(new SaveHostAttributes(updated.get())));
- }
- }
-
- @Timed("scheduler_lock_save")
- @Override
- public void saveLock(final ILock lock) {
- checkNotNull(lock);
-
- log(Op.saveLock(new SaveLock(lock.newBuilder())));
- super.saveLock(lock);
- }
-
- @Timed("scheduler_lock_remove")
- @Override
- public void removeLock(final ILockKey lockKey) {
- checkNotNull(lockKey);
-
- log(Op.removeLock(new RemoveLock(lockKey.newBuilder())));
- super.removeLock(lockKey);
- }
+ public <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work)
+ throws StorageException, E {
- @Override
- public boolean setMaintenanceMode(final String host, final MaintenanceMode mode) {
- checkNotNull(host);
- checkNotNull(mode);
-
- Optional<HostAttributes> saved = getHostAttributes(host);
- if (saved.isPresent()) {
- HostAttributes attributes = saved.get().setMode(mode);
- log(Op.saveHostAttributes(new SaveHostAttributes(attributes)));
- super.saveHostAttributes(attributes);
- return true;
- }
- return false;
+ return writeBehindStorage.weaklyConsistentRead(work);
}
@Override
@@ -693,14 +554,4 @@ public class LogStorage extends ForwardingStore
throw new StorageException("Failed to create a snapshot", e);
}
}
-
- private void log(Op op) {
- Preconditions.checkState(
- !recovered || (transaction != null),
- "Mutating operations must be done during recovery or within a transaction.");
-
- if (recovered) {
- transaction.add(op);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1713d082/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
new file mode 100644
index 0000000..e777e59
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
@@ -0,0 +1,343 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.twitter.common.inject.TimedInterceptor.Timed;
+
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.RemoveJob;
+import org.apache.aurora.gen.storage.RemoveLock;
+import org.apache.aurora.gen.storage.RemoveQuota;
+import org.apache.aurora.gen.storage.RemoveTasks;
+import org.apache.aurora.gen.storage.RewriteTask;
+import org.apache.aurora.gen.storage.SaveAcceptedJob;
+import org.apache.aurora.gen.storage.SaveFrameworkId;
+import org.apache.aurora.gen.storage.SaveHostAttributes;
+import org.apache.aurora.gen.storage.SaveLock;
+import org.apache.aurora.gen.storage.SaveQuota;
+import org.apache.aurora.gen.storage.SaveTasks;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.ForwardingStore;
+import org.apache.aurora.scheduler.storage.JobStore;
+import org.apache.aurora.scheduler.storage.LockStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.ILock;
+import org.apache.aurora.scheduler.storage.entities.ILockKey;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static org.apache.aurora.scheduler.storage.log.LogStorage.TransactionManager;
+
+/**
+ * Mutable stores implementation that translates all operations to {@link Op}s (which are passed
+ * to a provided {@link TransactionManager}) before forwarding the operations to delegate mutable
+ * stores.
+ */
+class WriteAheadStorage extends ForwardingStore implements
+ MutableStoreProvider,
+ SchedulerStore.Mutable,
+ JobStore.Mutable,
+ TaskStore.Mutable,
+ LockStore.Mutable,
+ QuotaStore.Mutable,
+ AttributeStore.Mutable {
+
+ private static final Logger LOG = Logger.getLogger(WriteAheadStorage.class.getName());
+
+ private final TransactionManager transactionManager;
+ private final SchedulerStore.Mutable schedulerStore;
+ private final JobStore.Mutable jobStore;
+ private final TaskStore.Mutable taskStore;
+ private final LockStore.Mutable lockStore;
+ private final QuotaStore.Mutable quotaStore;
+ private final AttributeStore.Mutable attributeStore;
+
+ /**
+ * Creates a new write-ahead storage that delegates to the providing default stores.
+ *
+ * @param transactionManager External controller for transaction operations.
+ * @param schedulerStore Delegate.
+ * @param jobStore Delegate.
+ * @param taskStore Delegate.
+ * @param lockStore Delegate.
+ * @param quotaStore Delegate.
+ * @param attributeStore Delegate.
+ */
+ WriteAheadStorage(
+ TransactionManager transactionManager,
+ SchedulerStore.Mutable schedulerStore,
+ JobStore.Mutable jobStore,
+ TaskStore.Mutable taskStore,
+ LockStore.Mutable lockStore,
+ QuotaStore.Mutable quotaStore,
+ AttributeStore.Mutable attributeStore) {
+
+ super(schedulerStore, jobStore, taskStore, lockStore, quotaStore, attributeStore);
+
+ this.transactionManager = checkNotNull(transactionManager);
+ this.schedulerStore = checkNotNull(schedulerStore);
+ this.jobStore = checkNotNull(jobStore);
+ this.taskStore = checkNotNull(taskStore);
+ this.lockStore = checkNotNull(lockStore);
+ this.quotaStore = checkNotNull(quotaStore);
+ this.attributeStore = checkNotNull(attributeStore);
+ }
+
+ private void log(Op op) {
+ Preconditions.checkState(
+ transactionManager.hasActiveTransaction(),
+ "Mutating operations must be within a transaction.");
+ transactionManager.log(op);
+ }
+
+ @Timed("scheduler_log_save_framework_id")
+ @Override
+ public void saveFrameworkId(final String frameworkId) {
+ checkNotNull(frameworkId);
+
+ log(Op.saveFrameworkId(new SaveFrameworkId(frameworkId)));
+ schedulerStore.saveFrameworkId(frameworkId);
+ }
+
+ @Timed("scheduler_log_unsafe_modify_in_place")
+ @Override
+ public boolean unsafeModifyInPlace(final String taskId, final ITaskConfig taskConfiguration) {
+ checkNotNull(taskId);
+ checkNotNull(taskConfiguration);
+
+ boolean mutated = taskStore.unsafeModifyInPlace(taskId, taskConfiguration);
+ if (mutated) {
+ log(Op.rewriteTask(new RewriteTask(taskId, taskConfiguration.newBuilder())));
+ }
+ return mutated;
+ }
+
+ @Timed("scheduler_log_tasks_remove")
+ @Override
+ public void deleteTasks(final Set<String> taskIds) {
+ checkNotNull(taskIds);
+
+ log(Op.removeTasks(new RemoveTasks(taskIds)));
+ taskStore.deleteTasks(taskIds);
+ }
+
+ @Timed("scheduler_log_tasks_save")
+ @Override
+ public void saveTasks(final Set<IScheduledTask> newTasks) {
+ checkNotNull(newTasks);
+
+ log(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks))));
+ taskStore.saveTasks(newTasks);
+ }
+
+ @Timed("scheduler_log_tasks_mutate")
+ @Override
+ public ImmutableSet<IScheduledTask> mutateTasks(
+ final Query.Builder query,
+ final Function<IScheduledTask, IScheduledTask> mutator) {
+
+ checkNotNull(query);
+ checkNotNull(mutator);
+
+ ImmutableSet<IScheduledTask> mutated = taskStore.mutateTasks(query, mutator);
+
+ Map<String, IScheduledTask> tasksById = Tasks.mapById(mutated);
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("Storing updated tasks to log: "
+ + Maps.transformValues(tasksById, Tasks.GET_STATUS));
+ }
+
+ // TODO(William Farner): Avoid writing an op when mutated is empty.
+ log(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(mutated))));
+ return mutated;
+ }
+
+ @Timed("scheduler_log_quota_save")
+ @Override
+ public void saveQuota(final String role, final IResourceAggregate quota) {
+ checkNotNull(role);
+ checkNotNull(quota);
+
+ log(Op.saveQuota(new SaveQuota(role, quota.newBuilder())));
+ quotaStore.saveQuota(role, quota);
+ }
+
+ @Timed("scheduler_save_host_attribute")
+ @Override
+ public void saveHostAttributes(final HostAttributes attrs) {
+ checkNotNull(attrs);
+
+ // Pass the updated attributes upstream, and then check if the stored value changes.
+ // We do this since different parts of the system write partial HostAttributes objects
+ // and they are merged together internally.
+ // TODO(William Farner): Split out a separate method
+ // saveAttributes(String host, Iterable<Attributes>) to simplify this.
+ Optional<HostAttributes> saved = getHostAttributes(attrs.getHost());
+ attributeStore.saveHostAttributes(attrs);
+ Optional<HostAttributes> updated = getHostAttributes(attrs.getHost());
+ if (!saved.equals(updated)) {
+ log(Op.saveHostAttributes(new SaveHostAttributes(updated.get())));
+ }
+ }
+
+ @Timed("scheduler_log_job_remove")
+ @Override
+ public void removeJob(final IJobKey jobKey) {
+ checkNotNull(jobKey);
+
+ log(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder())));
+ jobStore.removeJob(jobKey);
+ }
+
+ @Timed("scheduler_log_job_save")
+ @Override
+ public void saveAcceptedJob(final String managerId, final IJobConfiguration jobConfig) {
+ checkNotNull(managerId);
+ checkNotNull(jobConfig);
+
+ log(Op.saveAcceptedJob(new SaveAcceptedJob(managerId, jobConfig.newBuilder())));
+ jobStore.saveAcceptedJob(managerId, jobConfig);
+ }
+
+ @Timed("scheduler_log_quota_remove")
+ @Override
+ public void removeQuota(final String role) {
+ checkNotNull(role);
+
+ log(Op.removeQuota(new RemoveQuota(role)));
+ quotaStore.removeQuota(role);
+ }
+
+ @Timed("scheduler_lock_save")
+ @Override
+ public void saveLock(final ILock lock) {
+ checkNotNull(lock);
+
+ log(Op.saveLock(new SaveLock(lock.newBuilder())));
+ lockStore.saveLock(lock);
+ }
+
+ @Timed("scheduler_lock_remove")
+ @Override
+ public void removeLock(final ILockKey lockKey) {
+ checkNotNull(lockKey);
+
+ log(Op.removeLock(new RemoveLock(lockKey.newBuilder())));
+ lockStore.removeLock(lockKey);
+ }
+
+ @Override
+ public void deleteAllTasks() {
+ throw new UnsupportedOperationException(
+ "Unsupported since casual storage users should never be doing this.");
+ }
+
+ @Override
+ public void deleteHostAttributes() {
+ throw new UnsupportedOperationException(
+ "Unsupported since casual storage users should never be doing this.");
+ }
+
+ @Override
+ public void deleteJobs() {
+ throw new UnsupportedOperationException(
+ "Unsupported since casual storage users should never be doing this.");
+ }
+
+ @Override
+ public void deleteQuotas() {
+ throw new UnsupportedOperationException(
+ "Unsupported since casual storage users should never be doing this.");
+ }
+
+ @Override
+ public void deleteLocks() {
+ throw new UnsupportedOperationException(
+ "Unsupported since casual storage users should never be doing this.");
+ }
+
+ @Override
+ public boolean setMaintenanceMode(final String host, final MaintenanceMode mode) {
+ checkNotNull(host);
+ checkNotNull(mode);
+
+ Optional<HostAttributes> saved = getHostAttributes(host);
+ if (saved.isPresent()) {
+ HostAttributes attributes = saved.get().setMode(mode);
+ log(Op.saveHostAttributes(new SaveHostAttributes(attributes)));
+ attributeStore.saveHostAttributes(attributes);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public SchedulerStore.Mutable getSchedulerStore() {
+ return this;
+ }
+
+ @Override
+ public JobStore.Mutable getJobStore() {
+ return this;
+ }
+
+ @Override
+ public TaskStore.Mutable getUnsafeTaskStore() {
+ return this;
+ }
+
+ @Override
+ public LockStore.Mutable getLockStore() {
+ return this;
+ }
+
+ @Override
+ public QuotaStore.Mutable getQuotaStore() {
+ return this;
+ }
+
+ @Override
+ public AttributeStore.Mutable getAttributeStore() {
+ return this;
+ }
+
+ @Override
+ public TaskStore getTaskStore() {
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1713d082/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
index 8b26cc0..3437743 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
@@ -558,7 +558,6 @@ public class LogStorageTest extends EasyMockTest {
@Override
protected void setupExpectations() throws Exception {
storageUtil.expectWriteOperation();
- expect(storageUtil.taskStore.fetchTasks(Query.unscoped())).andReturn(ImmutableSet.of(task));
storageUtil.taskStore.deleteTasks(taskIds);
streamMatcher.expectTransaction(Op.removeTasks(new RemoveTasks(taskIds)))
.andReturn(position);
@@ -566,7 +565,7 @@ public class LogStorageTest extends EasyMockTest {
@Override
protected void performMutations(MutableStoreProvider storeProvider) {
- storeProvider.getUnsafeTaskStore().deleteAllTasks();
+ storeProvider.getUnsafeTaskStore().deleteTasks(taskIds);
}
}.run();
}
@@ -714,17 +713,6 @@ public class LogStorageTest extends EasyMockTest {
}.run();
}
- @Test(expected = IllegalStateException.class)
- public void testMutateRequiresWriteOperation() throws Exception {
- new StorageTestFixture() {
-
- @Override
- protected void runTest() {
- logStorage.deleteTasks(ImmutableSet.of("a"));
- }
- }.run();
- }
-
private LogEntry createTransaction(Op... ops) {
return LogEntry.transaction(
new Transaction(ImmutableList.copyOf(ops), storageConstants.CURRENT_SCHEMA_VERSION));