You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:23 UTC
[30/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
new file mode 100644
index 0000000..fdc9ae7
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -0,0 +1,322 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.BindingAnnotation;
+
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.SchedulerException;
+import com.twitter.aurora.scheduler.storage.entities.IQuota;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+
+/**
+ * Manages scheduler storage operations providing an interface to perform atomic changes.
+ */
+public interface Storage {
+
+ interface StoreProvider {
+ SchedulerStore getSchedulerStore();
+ JobStore getJobStore();
+ TaskStore getTaskStore();
+ LockStore getLockStore();
+ QuotaStore getQuotaStore();
+ AttributeStore getAttributeStore();
+ }
+
+ interface MutableStoreProvider extends StoreProvider {
+ SchedulerStore.Mutable getSchedulerStore();
+ JobStore.Mutable getJobStore();
+
+ /**
+ * Gets access to the mutable task store.
+ * <p>
+ * This is labeled as unsafe, since it's rare that a caller should be using this. In most
+ * cases, mutations to the task store should be done through
+ * {@link com.twitter.aurora.scheduler.state.StateManager}.
+ * <p>
+ * TODO(William Farner): Come up with a way to restrict access to this interface. As it stands,
+ * it's trivial for an unsuspecting caller to modify the task store directly and subvert the
+ * state machine and side effect systems.
+ *
+ * @return The mutable task store.
+ */
+ TaskStore.Mutable getUnsafeTaskStore();
+
+ LockStore.Mutable getLockStore();
+ QuotaStore.Mutable getQuotaStore();
+ AttributeStore.Mutable getAttributeStore();
+ }
+
+ /**
+ * Encapsulates a read-only storage operation.
+ *
+ * @param <T> The type of result this unit of work produces.
+ * @param <E> The type of exception this unit of work can throw.
+ */
+ interface Work<T, E extends Exception> {
+
+ /**
+ * Abstracts a unit of work that has a result, but may also throw a specific exception.
+ *
+ * @param storeProvider A provider to give access to different available stores.
+ * @return the result of the successfully completed unit of work
+ * @throws E if the unit of work could not be completed
+ */
+ T apply(StoreProvider storeProvider) throws E;
+
+ /**
+ * A convenient typedef for Work that throws no checked exceptions - it runs quietly.
+ *
+ * @param <T> The type of result this unit of work produces.
+ */
+ interface Quiet<T> extends Work<T, RuntimeException> {
+ // typedef
+ }
+ }
+
+ /**
+ * Encapsulates a storage operation, which has mutable storage access.
+ *
+ * @param <T> The type of result this unit of work produces.
+ * @param <E> The type of exception this unit of work can throw.
+ */
+ interface MutateWork<T, E extends Exception> {
+
+ NoResult.Quiet NOOP = new NoResult.Quiet() {
+ @Override protected void execute(Storage.MutableStoreProvider storeProvider) {
+ // No-op.
+ }
+ };
+
+ /**
+ * Abstracts a unit of work that should either commit a set of changes to storage as a side
+ * effect of successful completion or else commit no changes at all when an exception is thrown.
+ *
+ * @param storeProvider A provider to give access to different available stores.
+ * @return the result of the successfully completed unit of work
+ * @throws E if the unit of work could not be completed
+ */
+ T apply(MutableStoreProvider storeProvider) throws E;
+
+ /**
+ * A convenient typedef for Work that throws no checked exceptions - it runs quietly.
+ *
+ * @param <T> The type of result this unit of work produces.
+ */
+ interface Quiet<T> extends MutateWork<T, RuntimeException> {
+ // typedef
+ }
+
+ /**
+ * Encapsulates work that returns no result.
+ *
+ * @param <E> The type of exception this unit of work can throw.
+ */
+ abstract class NoResult<E extends Exception> implements MutateWork<Void, E> {
+
+ @Override public final Void apply(MutableStoreProvider storeProvider) throws E {
+ execute(storeProvider);
+ return null;
+ }
+
+ /**
+ * Similar to {@link #apply(MutableStoreProvider)} except that no result is
+ * returned.
+ *
+ * @param storeProvider A provider to give access to different available stores.
+ * @throws E if the unit of work could not be completed
+ */
+ protected abstract void execute(MutableStoreProvider storeProvider) throws E;
+
+ /**
+ * A convenient typedef for Work with no result that throws no checked exceptions - it runs
+ * quitely.
+ */
+ public abstract static class Quiet extends NoResult<RuntimeException> {
+ // typedef
+ }
+ }
+ }
+
+ /**
+ * Indicates a problem reading from or writing to stable storage.
+ */
+ class StorageException extends SchedulerException {
+ public StorageException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public StorageException(String message) {
+ super(message);
+ }
+ }
+
+ /**
+ * Executes the unit of read-only {@code work}. All data in the stores may be expected to be
+ * consistent, as the invocation is mutually exclusive of any writes.
+ *
+ * @param work The unit of work to execute.
+ * @param <T> The type of result this unit of work produces.
+ * @param <E> The type of exception this unit of work can throw.
+ * @return the result when the unit of work completes successfully
+ * @throws StorageException if there was a problem reading from stable storage.
+ * @throws E bubbled transparently when the unit of work throws
+ */
+ <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E;
+
+ /**
+ * Executes a unit of read-only {@code work}. This is functionally identical to
+ * {@link #consistentRead(Work)} with the exception that data in the stores may not be fully
+ * consistent.
+ *
+ * @param work The unit of work to execute.
+ * @param <T> The type of result this unit of work produces.
+ * @param <E> The type of exception this unit of work can throw.
+ * @return the result when the unit of work completes successfully
+ * @throws StorageException if there was a problem reading from stable storage.
+ * @throws E bubbled transparently when the unit of work throws
+ */
+ <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work) throws StorageException, E;
+
+ /**
+ * Executes the unit of mutating {@code work}.
+ *
+ * @param work The unit of work to execute.
+ * @param <T> The type of result this unit of work produces.
+ * @param <E> The type of exception this unit of work can throw.
+ * @return the result when the unit of work completes successfully
+ * @throws StorageException if there was a problem reading from or writing to stable storage.
+ * @throws E bubbled transparently when the unit of work throws
+ */
+ <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E;
+
+ /**
+ * Clean up the underlying storage by optimizing internal data structures. Does not change
+ * externally-visible state but might not run concurrently with write operations.
+ */
+ void snapshot() throws StorageException;
+
+ /**
+ * A non-volatile storage that has additional methods to control its lifecycle.
+ */
+ interface NonVolatileStorage extends Storage {
+ /**
+ * Requests the underlying storage prepare its data set; ie: initialize schemas, begin syncing
+ * out of date data, etc. This method should not block.
+ *
+ * @throws StorageException if there was a problem preparing storage.
+ */
+ void prepare() throws StorageException;
+
+ /**
+ * Prepares the underlying storage for serving traffic.
+ *
+ * @param initializationLogic work to perform after this storage system is ready but before
+ * allowing general use of
+ * {@link #consistentRead}.
+ * @throws StorageException if there was a starting storage.
+ */
+ void start(MutateWork.NoResult.Quiet initializationLogic) throws StorageException;
+
+ /**
+ * Prepares the underlying storage system for clean shutdown.
+ */
+ void stop();
+ }
+
+ /**
+ * Identifies a storage layer that is in-memory only.
+ * This generally should only be used when the storage is first starting up, to perform queries
+ * related to initially load the storage.
+ */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target({ ElementType.PARAMETER, ElementType.METHOD })
+ @BindingAnnotation
+ public @interface Volatile { }
+
+ /**
+ * Utility functions for interacting with a Storage instance.
+ */
+ public final class Util {
+
+ private Util() {
+ // Utility class.
+ }
+
+ /**
+ * Fetch tasks matching the query returned by {@code query} from {@code storage} in a
+ * read operation.
+ *
+ * @see #consistentFetchTasks
+ * @param storage Storage instance to query from.
+ * @param query Builder of the query to perform.
+ * @return Tasks returned from the query.
+ */
+ public static ImmutableSet<IScheduledTask> consistentFetchTasks(
+ Storage storage,
+ final Query.Builder query) {
+
+ return storage.consistentRead(new Work.Quiet<ImmutableSet<IScheduledTask>>() {
+ @Override public ImmutableSet<IScheduledTask> apply(StoreProvider storeProvider) {
+ return storeProvider.getTaskStore().fetchTasks(query);
+ }
+ });
+ }
+
+ /**
+ * Identical to {@link #consistentFetchTasks(Storage, Query.Builder)}, but fetches tasks using a
+ * weakly-consistent read operation.
+ *
+ * @see #consistentFetchTasks
+ * @param storage Storage instance to query from.
+ * @param query Builder of the query to perform.
+ * @return Tasks returned from the query.
+ */
+ public static ImmutableSet<IScheduledTask> weaklyConsistentFetchTasks(
+ Storage storage,
+ final Query.Builder query) {
+
+ return storage.weaklyConsistentRead(new Work.Quiet<ImmutableSet<IScheduledTask>>() {
+ @Override public ImmutableSet<IScheduledTask> apply(StoreProvider storeProvider) {
+ return storeProvider.getTaskStore().fetchTasks(query);
+ }
+ });
+ }
+
+ /**
+ * Fetch quota for {@code role} from {@code storage} in a consistent read operation.
+ *
+ * @param storage Storage instance to fetch quota from.
+ * @param role Role to fetch quota for.
+ * @return Quota returned from the fetch operation.
+ * @see QuotaStore#fetchQuota(String)
+ */
+ public static Optional<IQuota> consistentFetchQuota(Storage storage, final String role) {
+ return storage.consistentRead(new Work.Quiet<Optional<IQuota>>() {
+ @Override public Optional<IQuota> apply(StoreProvider storeProvider) {
+ return storeProvider.getQuotaStore().fetchQuota(role);
+ }
+ });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java b/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
new file mode 100644
index 0000000..df5b603
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/StorageBackfill.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+import com.twitter.aurora.gen.JobConfiguration;
+import com.twitter.aurora.gen.ScheduleStatus;
+import com.twitter.aurora.gen.ScheduledTask;
+import com.twitter.aurora.gen.TaskConfig;
+import com.twitter.aurora.gen.TaskEvent;
+import com.twitter.aurora.scheduler.base.JobKeys;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.configuration.ConfigurationManager;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
+import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.Clock;
+
+/**
+ * Utility class to contain and perform storage backfill operations.
+ */
+public final class StorageBackfill {
+
+ private static final Logger LOG = Logger.getLogger(StorageBackfill.class.getName());
+
+ private static final AtomicLong SHARD_SANITY_CHECK_FAILS =
+ Stats.exportLong("shard_sanity_check_failures");
+
+ private StorageBackfill() {
+ // Utility class.
+ }
+
+ private static void backfillJobDefaults(JobStore.Mutable jobStore) {
+ for (String id : jobStore.fetchManagerIds()) {
+ for (JobConfiguration job : IJobConfiguration.toBuildersList(jobStore.fetchJobs(id))) {
+ ConfigurationManager.applyDefaultsIfUnset(job);
+ jobStore.saveAcceptedJob(id, IJobConfiguration.build(job));
+ }
+ }
+ }
+
+ private static void guaranteeShardUniqueness(
+ ScheduledTask task,
+ TaskStore.Mutable taskStore,
+ Clock clock) {
+
+ if (Tasks.isActive(task.getStatus())) {
+ // Perform a sanity check on the number of active shards.
+ TaskConfig config = task.getAssignedTask().getTask();
+ Query.Builder query = Query.instanceScoped(
+ JobKeys.from(config.getOwner().getRole(), config.getEnvironment(), config.getJobName()),
+ task.getAssignedTask().getInstanceId())
+ .active();
+ Set<String> activeTasksInShard = FluentIterable.from(taskStore.fetchTasks(query))
+ .transform(Tasks.SCHEDULED_TO_ID)
+ .toSet();
+
+ if (activeTasksInShard.size() > 1) {
+ SHARD_SANITY_CHECK_FAILS.incrementAndGet();
+ LOG.severe("Active shard sanity check failed when loading " + Tasks.id(task)
+ + ", active tasks found: " + activeTasksInShard);
+
+ // We want to keep exactly one task from this shard, so sort the IDs and keep the
+ // highest (newest) in the hopes that it is legitimately running.
+ String newestTask = Iterables.getLast(Sets.newTreeSet(activeTasksInShard));
+ if (!Tasks.id(task).equals(newestTask)) {
+ task.setStatus(ScheduleStatus.KILLED);
+ task.addToTaskEvents(new TaskEvent(clock.nowMillis(), ScheduleStatus.KILLED)
+ .setMessage("Killed duplicate shard."));
+ // TODO(wfarner); Circle back if this is necessary. Currently there's a race
+ // condition between the time the scheduler is actually available without hitting
+ // IllegalStateException (see DriverImpl).
+ // driver.killTask(Tasks.id(task));
+ } else {
+ LOG.info("Retaining task " + Tasks.id(task));
+ }
+ }
+ }
+ }
+
+ private static final AtomicLong BOTH_FIELDS_SET = Stats.exportLong("both_instance_ids_set");
+ private static final AtomicLong OLD_FIELD_SET = Stats.exportLong("old_instance_id_set");
+ private static final AtomicLong NEW_FIELD_SET = Stats.exportLong("new_instance_id_set");
+ private static final AtomicLong FIELDS_INCONSISTENT =
+ Stats.exportLong("instance_ids_inconsistent");
+
+ /**
+ * Ensures backwards-compatibility of the throttled state, which exists in this version but is
+ * not handled.
+ *
+ * @param task Task to possibly rewrite.
+ */
+ private static void rewriteThrottledState(ScheduledTask task) {
+ if (ScheduleStatus.THROTTLED == task.getStatus()) {
+ task.setStatus(ScheduleStatus.PENDING);
+ }
+ }
+
+ /**
+ * Backfills the storage to make it match any assumptions that may have changed since
+ * the structs were first written.
+ *
+ * @param storeProvider Storage provider.
+ * @param clock Clock, used for timestamping backfilled task events.
+ */
+ public static void backfill(final MutableStoreProvider storeProvider, final Clock clock) {
+ backfillJobDefaults(storeProvider.getJobStore());
+
+ LOG.info("Performing shard uniqueness sanity check.");
+ storeProvider.getUnsafeTaskStore().mutateTasks(Query.unscoped(), new TaskMutation() {
+ @Override public IScheduledTask apply(final IScheduledTask task) {
+ ScheduledTask builder = task.newBuilder();
+ ConfigurationManager.applyDefaultsIfUnset(builder.getAssignedTask().getTask());
+ // TODO(ksweeney): Guarantee tasks pass current validation code here and quarantine if they
+ // don't.
+ guaranteeShardUniqueness(builder, storeProvider.getUnsafeTaskStore(), clock);
+ rewriteThrottledState(builder);
+ return IScheduledTask.build(builder);
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
new file mode 100644
index 0000000..02e5096
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/TaskStore.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage;
+
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
+
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
+
+/**
+ * Stores all tasks configured with the scheduler.
+ */
+public interface TaskStore {
+
+ /**
+ * Fetches a read-only view of tasks matching a query and filters. Intended for use with a
+ * {@link com.twitter.aurora.scheduler.base.Query.Builder}.
+ *
+ * @param query Builder of the query to identify tasks with.
+ * @return A read-only view of matching tasks.
+ */
+ ImmutableSet<IScheduledTask> fetchTasks(Query.Builder query);
+
+ public interface Mutable extends TaskStore {
+
+ /**
+ * A convenience interface to allow callers to more concisely implement a task mutation.
+ */
+ public interface TaskMutation extends Function<IScheduledTask, IScheduledTask> {
+ }
+
+ /**
+ * Saves tasks to the store. Tasks are copied internally, meaning that the tasks are stored in
+ * the state they were in when the method is called, and further object modifications will not
+ * affect the tasks. If any of the tasks already exist in the store, they will be overwritten
+ * by the supplied {@code newTasks}.
+ *
+ * @param tasks Tasks to add.
+ */
+ void saveTasks(Set<IScheduledTask> tasks);
+
+ /**
+ * Removes all tasks from the store.
+ */
+ void deleteAllTasks();
+
+ /**
+ * Deletes specific tasks from the store.
+ *
+ * @param taskIds IDs of tasks to delete.
+ */
+ void deleteTasks(Set<String> taskIds);
+
+ /**
+ * Offers temporary mutable access to tasks. If a task ID is not found, it will be silently
+ * skipped, and no corresponding task will be returned.
+ *
+ * @param query Query to match tasks against.
+ * @param mutator The mutate operation.
+ * @return Immutable copies of only the tasks that were mutated.
+ */
+ ImmutableSet<IScheduledTask> mutateTasks(
+ Query.Builder query,
+ Function<IScheduledTask, IScheduledTask> mutator);
+
+ /**
+ * Rewrites a task's configuration in-place.
+ * <p>
+ * <b>WARNING</b>: this is a dangerous operation, and should not be used without exercising
+ * great care. This feature should be used as a last-ditch effort to rewrite things that
+ * the scheduler otherwise can't (e.g. {@link ITaskConfig#executorConfig}) rewrite in a
+ * controlled/tested backfill operation.
+ *
+ * @param taskId ID of the task to alter.
+ * @param taskConfiguration Configuration object to swap with the existing task's
+ * configuration.
+ * @return {@code true} if the modification took effect, or {@code false} if the task does not
+ * exist in the store.
+ */
+ boolean unsafeModifyInPlace(String taskId, ITaskConfig taskConfiguration);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java
new file mode 100644
index 0000000..b6beba3
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/BackupModule.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.backup;
+
+import java.io.File;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.inject.PrivateModule;
+import com.google.inject.Provides;
+import com.google.inject.TypeLiteral;
+
+import com.twitter.aurora.gen.storage.Snapshot;
+import com.twitter.aurora.scheduler.storage.SnapshotStore;
+import com.twitter.aurora.scheduler.storage.backup.Recovery.RecoveryImpl;
+import com.twitter.aurora.scheduler.storage.backup.StorageBackup.StorageBackupImpl;
+import com.twitter.aurora.scheduler.storage.backup.StorageBackup.StorageBackupImpl.BackupConfig;
+import com.twitter.aurora.scheduler.storage.backup.TemporaryStorage.TemporaryStorageFactory;
+import com.twitter.common.application.Lifecycle;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.args.constraints.NotNull;
+import com.twitter.common.base.Command;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A module that will periodically save full storage backups to local disk and makes those backups
+ * available for on-line recovery.
+ */
+public class BackupModule extends PrivateModule {
+ private static final Logger LOG = Logger.getLogger(BackupModule.class.getName());
+
+ @CmdLine(name = "backup_interval", help = "Minimum interval on which to write a storage backup.")
+ private static final Arg<Amount<Long, Time>> BACKUP_INTERVAL =
+ Arg.create(Amount.of(1L, Time.HOURS));
+
+ @CmdLine(name = "max_saved_backups",
+ help = "Maximum number of backups to retain before deleting the oldest backups.")
+ private static final Arg<Integer> MAX_SAVED_BACKUPS = Arg.create(48);
+
+ @NotNull
+ @CmdLine(name = "backup_dir",
+ help = "Directory to store backups under. Will be created if it does not exist.")
+ private static final Arg<File> BACKUP_DIR = Arg.create();
+
+ private final Class<? extends SnapshotStore<Snapshot>> snapshotStore;
+ private final File unvalidatedBackupDir;
+
+ /**
+ * Creates a new backup module.
+ *
+ * @param snapshotStore Snapshot store implementation class.
+ */
+ public BackupModule(Class<? extends SnapshotStore<Snapshot>> snapshotStore) {
+ this(BACKUP_DIR.get(), snapshotStore);
+ }
+
+ /**
+ * Creates a new backup module using a given backupDir instead of a flagged one.
+ *
+ * @param backupDir Directory to write backups to.
+ * @param snapshotStore Snapshot store implementation class.
+ */
+ @VisibleForTesting
+ public BackupModule(File backupDir, Class<? extends SnapshotStore<Snapshot>> snapshotStore) {
+ this.unvalidatedBackupDir = checkNotNull(backupDir);
+ this.snapshotStore = checkNotNull(snapshotStore);
+ }
+
+ @Override
+ protected void configure() {
+ TypeLiteral<SnapshotStore<Snapshot>> type = new TypeLiteral<SnapshotStore<Snapshot>>() { };
+ bind(type).annotatedWith(StorageBackupImpl.SnapshotDelegate.class).to(snapshotStore);
+
+ bind(type).to(StorageBackupImpl.class);
+ bind(StorageBackup.class).to(StorageBackupImpl.class);
+ bind(StorageBackupImpl.class).in(Singleton.class);
+ expose(type);
+ expose(StorageBackup.class);
+
+ bind(new TypeLiteral<Function<Snapshot, TemporaryStorage>>() { })
+ .to(TemporaryStorageFactory.class);
+
+ bind(Command.class).to(LifecycleHook.class);
+ bind(Recovery.class).to(RecoveryImpl.class);
+ bind(RecoveryImpl.class).in(Singleton.class);
+ expose(Recovery.class);
+ }
+
+ static class LifecycleHook implements Command {
+ private final Lifecycle lifecycle;
+
+ @Inject LifecycleHook(Lifecycle lifecycle) {
+ this.lifecycle = checkNotNull(lifecycle);
+ }
+
+ @Override public void execute() {
+ lifecycle.shutdown();
+ }
+ }
+
+ @Provides
+ private File provideBackupDir() {
+ if (!unvalidatedBackupDir.exists()) {
+ if (!unvalidatedBackupDir.mkdirs()) {
+ throw new IllegalArgumentException(
+ "Unable to create backup dir " + unvalidatedBackupDir.getPath() + ".");
+ } else {
+ LOG.info("Created backup dir " + unvalidatedBackupDir.getPath() + ".");
+ }
+ }
+
+ if (!unvalidatedBackupDir.canWrite()) {
+ throw new IllegalArgumentException(
+ "Backup dir " + unvalidatedBackupDir.getPath() + " is not writable.");
+ }
+
+ return unvalidatedBackupDir;
+ }
+
+ @Provides
+ private BackupConfig provideBackupConfig(File backupDir) {
+ return new BackupConfig(backupDir, MAX_SAVED_BACKUPS.get(), BACKUP_INTERVAL.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
new file mode 100644
index 0000000..4e91342
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/Recovery.java
@@ -0,0 +1,210 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.backup;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.Atomics;
+
+import com.twitter.aurora.codec.ThriftBinaryCodec;
+import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
+import com.twitter.aurora.gen.storage.Snapshot;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.storage.DistributedSnapshotStore;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.common.base.Command;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A recovery mechanism that works with {@link StorageBackup} to provide a two-step storage
+ * recovery process.
+ */
+public interface Recovery {
+
+ /**
+ * List backups available for recovery.
+ *
+ * @return Available backup IDs.
+ */
+ Set<String> listBackups();
+
+ /**
+ * Loads a backup in 'staging' so that it may be queried and modified prior to committing.
+ *
+ * @param backupName Name of the backup to load.
+ * @throws RecoveryException If the backup could not be found or loaded.
+ */
+ void stage(String backupName) throws RecoveryException;
+
+ /**
+ * Queries a staged backup.
+ *
+ * @param query Builder of query to perform.
+ * @return Tasks matching the query.
+ * @throws RecoveryException If a backup is not staged, or could not be queried.
+ */
+ Set<IScheduledTask> query(Query.Builder query) throws RecoveryException;
+
+ /**
+ * Deletes tasks from a staged backup.
+ *
+ * @param query Query builder for tasks to delete.
+ * @throws RecoveryException If a backup is not staged, or tasks could not be deleted.
+ */
+ void deleteTasks(Query.Builder query) throws RecoveryException;
+
+ /**
+ * Unloads a staged backup.
+ */
+ void unload();
+
+ /**
+ * Commits a staged backup the main storage system.
+ *
+ * @throws RecoveryException If a backup is not staged, or the commit failed.
+ */
+ void commit() throws RecoveryException;
+
+ /**
+ * Thrown when a recovery operation could not be completed due to internal errors or improper
+ * invocation order.
+ */
+ public static class RecoveryException extends Exception {
+ RecoveryException(String message) {
+ super(message);
+ }
+
+ RecoveryException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ class RecoveryImpl implements Recovery {
+ private final File backupDir;
+ private final Function<Snapshot, TemporaryStorage> tempStorageFactory;
+ private final AtomicReference<PendingRecovery> recovery;
+ private final Storage primaryStorage;
+ private final DistributedSnapshotStore distributedStore;
+ private final Command shutDownNow;
+
+ @Inject
+ RecoveryImpl(
+ File backupDir,
+ Function<Snapshot, TemporaryStorage> tempStorageFactory,
+ Storage primaryStorage,
+ DistributedSnapshotStore distributedStore,
+ Command shutDownNow) {
+
+ this.backupDir = checkNotNull(backupDir);
+ this.tempStorageFactory = checkNotNull(tempStorageFactory);
+ this.recovery = Atomics.newReference();
+ this.primaryStorage = checkNotNull(primaryStorage);
+ this.distributedStore = checkNotNull(distributedStore);
+ this.shutDownNow = checkNotNull(shutDownNow);
+ }
+
+ @Override public Set<String> listBackups() {
+ return ImmutableSet.<String>builder().add(backupDir.list()).build();
+ }
+
+ @Override public void stage(String backupName) throws RecoveryException {
+ File backupFile = new File(backupDir, backupName);
+ if (!backupFile.exists()) {
+ throw new RecoveryException("Backup " + backupName + " does not exist.");
+ }
+
+ Snapshot snapshot;
+ try {
+ snapshot = ThriftBinaryCodec.decode(Snapshot.class, Files.toByteArray(backupFile));
+ } catch (CodingException e) {
+ throw new RecoveryException("Failed to decode backup " + e, e);
+ } catch (IOException e) {
+ throw new RecoveryException("Failed to read backup " + e, e);
+ }
+ boolean applied =
+ recovery.compareAndSet(null, new PendingRecovery(tempStorageFactory.apply(snapshot)));
+ if (!applied) {
+ throw new RecoveryException("Another backup is already loaded.");
+ }
+ }
+
+ private PendingRecovery getLoadedRecovery() throws RecoveryException {
+ @Nullable PendingRecovery loaded = this.recovery.get();
+ if (loaded == null) {
+ throw new RecoveryException("No backup loaded.");
+ }
+ return loaded;
+ }
+
+ @Override public Set<IScheduledTask> query(Query.Builder query) throws RecoveryException {
+ return getLoadedRecovery().query(query);
+ }
+
+ @Override public void deleteTasks(Query.Builder query) throws RecoveryException {
+ getLoadedRecovery().delete(query);
+ }
+
+ @Override public void unload() {
+ recovery.set(null);
+ }
+
+ @Override public void commit() throws RecoveryException {
+ getLoadedRecovery().commit();
+ }
+
+ private class PendingRecovery {
+ private final TemporaryStorage tempStorage;
+
+ PendingRecovery(TemporaryStorage tempStorage) {
+ this.tempStorage = tempStorage;
+ }
+
+ void commit() {
+ primaryStorage.write(new MutateWork.NoResult.Quiet() {
+ @Override protected void execute(MutableStoreProvider storeProvider) {
+ try {
+ distributedStore.persist(tempStorage.toSnapshot());
+ shutDownNow.execute();
+ } catch (CodingException e) {
+ throw new IllegalStateException("Failed to encode snapshot.", e);
+ }
+ }
+ });
+ }
+
+ Set<IScheduledTask> query(final Query.Builder query) {
+ return tempStorage.fetchTasks(query);
+ }
+
+ void delete(final Query.Builder query) {
+ tempStorage.deleteTasks(query);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java
new file mode 100644
index 0000000..3faeb1f
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/StorageBackup.java
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.backup;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Ordering;
+import com.google.common.io.Files;
+import com.google.inject.BindingAnnotation;
+
+import com.twitter.aurora.codec.ThriftBinaryCodec;
+import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
+import com.twitter.aurora.gen.storage.Snapshot;
+import com.twitter.aurora.scheduler.storage.SnapshotStore;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stats;
+import com.twitter.common.util.Clock;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A backup routine that layers over a snapshot store and periodically writes snapshots to
+ * local disk.
+ *
+ * TODO(William Farner): Perform backups asynchronously. As written, they are performed in a
+ * blocking write operation, which is asking for trouble.
+ */
+public interface StorageBackup {
+
+ /**
+ * Perform a storage backup immediately, blocking until it is complete.
+ */
+ void backupNow();
+
+ class StorageBackupImpl implements StorageBackup, SnapshotStore<Snapshot> {
+ private static final Logger LOG = Logger.getLogger(StorageBackup.class.getName());
+
+ private static final String FILE_PREFIX = "scheduler-backup-";
+ private final BackupConfig config;
+
+ static class BackupConfig {
+ private final File dir;
+ private final int maxBackups;
+ private final Amount<Long, Time> interval;
+
+ BackupConfig(File dir, int maxBackups, Amount<Long, Time> interval) {
+ this.dir = checkNotNull(dir);
+ this.maxBackups = maxBackups;
+ this.interval = checkNotNull(interval);
+ }
+
+ @VisibleForTesting
+ File getDir() {
+ return dir;
+ }
+ }
+
+ /**
+ * Binding annotation that the underlying {@link SnapshotStore} must be bound with.
+ */
+ @BindingAnnotation
+ @Target({FIELD, PARAMETER, METHOD}) @Retention(RUNTIME)
+ @interface SnapshotDelegate { }
+
+ private final SnapshotStore<Snapshot> delegate;
+ private final Clock clock;
+ private final long backupIntervalMs;
+ private volatile long lastBackupMs;
+ private final DateFormat backupDateFormat;
+
+ private final AtomicLong successes = Stats.exportLong("scheduler_backup_success");
+ @VisibleForTesting
+ AtomicLong getSuccesses() {
+ return successes;
+ }
+
+ private final AtomicLong failures = Stats.exportLong("scheduler_backup_failed");
+ @VisibleForTesting
+ AtomicLong getFailures() {
+ return failures;
+ }
+
+ @Inject
+ StorageBackupImpl(
+ @SnapshotDelegate SnapshotStore<Snapshot> delegate,
+ Clock clock,
+ BackupConfig config) {
+
+ this.delegate = checkNotNull(delegate);
+ this.clock = checkNotNull(clock);
+ this.config = checkNotNull(config);
+ backupDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm");
+ backupIntervalMs = config.interval.as(Time.MILLISECONDS);
+ lastBackupMs = clock.nowMillis();
+ }
+
+ @Override public Snapshot createSnapshot() {
+ Snapshot snapshot = delegate.createSnapshot();
+ if (clock.nowMillis() >= (lastBackupMs + backupIntervalMs)) {
+ save(snapshot);
+ }
+ return snapshot;
+ }
+
+ @Override public void backupNow() {
+ save(delegate.createSnapshot());
+ }
+
+ @VisibleForTesting
+ String createBackupName() {
+ return FILE_PREFIX + backupDateFormat.format(new Date(clock.nowMillis()));
+ }
+
+ private void save(Snapshot snapshot) {
+ lastBackupMs = clock.nowMillis();
+
+ String backupName = createBackupName();
+ String tempBackupName = "temp_" + backupName;
+ File tempFile = new File(config.dir, tempBackupName);
+ LOG.info("Saving backup to " + tempFile);
+ try {
+ byte[] backup = ThriftBinaryCodec.encodeNonNull(snapshot);
+ Files.write(backup, tempFile);
+ Files.move(tempFile, new File(config.dir, backupName));
+ successes.incrementAndGet();
+ } catch (IOException e) {
+ failures.incrementAndGet();
+ LOG.log(Level.SEVERE, "Failed to prepare backup " + backupName + ": " + e, e);
+ } catch (CodingException e) {
+ LOG.log(Level.SEVERE, "Failed to encode backup " + backupName + ": " + e, e);
+ failures.incrementAndGet();
+ } finally {
+ if (tempFile.exists()) {
+ LOG.info("Deleting incomplete backup file " + tempFile);
+ tempFile.delete();
+ }
+ }
+
+ File[] backups = config.dir.listFiles(BACKUP_FILTER);
+ if (backups == null) {
+ LOG.severe("Failed to list backup dir " + config.dir);
+ } else {
+ int backupsToDelete = backups.length - config.maxBackups;
+ if (backupsToDelete > 0) {
+ List<File> toDelete = Ordering.natural()
+ .onResultOf(FILE_NAME)
+ .sortedCopy(ImmutableList.copyOf(backups)).subList(0, backupsToDelete);
+ LOG.info("Deleting " + backupsToDelete + " outdated backups: " + toDelete);
+ for (File outdated : toDelete) {
+ outdated.delete();
+ }
+ }
+ }
+ }
+
+ private static final FilenameFilter BACKUP_FILTER = new FilenameFilter() {
+ @Override public boolean accept(File file, String s) {
+ return s.startsWith(FILE_PREFIX);
+ }
+ };
+
+ @VisibleForTesting
+ static final Function<File, String> FILE_NAME = new Function<File, String>() {
+ @Override public String apply(File file) {
+ return file.getName();
+ }
+ };
+
+ @Override
+ public void applySnapshot(Snapshot snapshot) {
+ delegate.applySnapshot(snapshot);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
new file mode 100644
index 0000000..e0906fe
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/backup/TemporaryStorage.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.backup;
+
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+
+import com.twitter.aurora.gen.storage.Snapshot;
+import com.twitter.aurora.scheduler.base.Query;
+import com.twitter.aurora.scheduler.base.Tasks;
+import com.twitter.aurora.scheduler.storage.SnapshotStore;
+import com.twitter.aurora.scheduler.storage.Storage;
+import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
+import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
+import com.twitter.aurora.scheduler.storage.Storage.Work;
+import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
+import com.twitter.aurora.scheduler.storage.log.SnapshotStoreImpl;
+import com.twitter.aurora.scheduler.storage.mem.MemStorage;
+import com.twitter.common.util.testing.FakeClock;
+
+/**
+ * A short-lived in-memory storage system that can be converted to a {@link Snapshot}.
+ */
+interface TemporaryStorage {
+
+ /**
+ * Deletes all tasks matching a query. Deleted tasks will not be reflected in the snapshot when
+ * {@link #toSnapshot()} is executed.
+ *
+ * @param query Query builder for tasks to delete.
+ */
+ void deleteTasks(Query.Builder query);
+
+ /**
+ * Fetches tasks matching a query.
+ *
+ * @param query Query builder for tasks to fetch.
+ * @return Matching tasks.
+ */
+ Set<IScheduledTask> fetchTasks(Query.Builder query);
+
+ /**
+ * Creates a snapshot of the contents of the temporary storage.
+ *
+ * @return Temporary storage snapshot.
+ */
+ Snapshot toSnapshot();
+
+ /**
+ * A factory that creates temporary storage instances, detached from the rest of the system.
+ */
+ class TemporaryStorageFactory implements Function<Snapshot, TemporaryStorage> {
+ @Override public TemporaryStorage apply(Snapshot snapshot) {
+ final Storage storage = MemStorage.newEmptyStorage();
+ FakeClock clock = new FakeClock();
+ clock.setNowMillis(snapshot.getTimestamp());
+ final SnapshotStore<Snapshot> snapshotStore = new SnapshotStoreImpl(clock, storage);
+ snapshotStore.applySnapshot(snapshot);
+
+ return new TemporaryStorage() {
+ @Override public void deleteTasks(final Query.Builder query) {
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override protected void execute(MutableStoreProvider storeProvider) {
+ Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
+ .transform(Tasks.SCHEDULED_TO_ID)
+ .toSet();
+ storeProvider.getUnsafeTaskStore().deleteTasks(ids);
+ }
+ });
+ }
+
+ @Override public Set<IScheduledTask> fetchTasks(final Query.Builder query) {
+ return storage.consistentRead(new Work.Quiet<Set<IScheduledTask>>() {
+ @Override public Set<IScheduledTask> apply(StoreProvider storeProvider) {
+ return storeProvider.getTaskStore().fetchTasks(query);
+ }
+ });
+ }
+
+ @Override public Snapshot toSnapshot() {
+ return snapshotStore.createSnapshot();
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/log/Entries.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/Entries.java b/src/main/java/org/apache/aurora/scheduler/storage/log/Entries.java
new file mode 100644
index 0000000..74e8c07
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/Entries.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.log;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteStreams;
+
+import com.twitter.aurora.codec.ThriftBinaryCodec;
+import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
+import com.twitter.aurora.gen.storage.LogEntry;
+import com.twitter.aurora.gen.storage.LogEntry._Fields;
+import com.twitter.common.stats.Stats;
+
+/**
+ * Utility class for working with log entries.
+ */
+final class Entries {
+
+ private static final Logger LOG = Logger.getLogger(Entries.class.getName());
+
+ private static final AtomicLong COMPRESSION_BYTES_SAVED =
+ Stats.exportLong("log_compressed_entry_bytes_saved");
+
+ private Entries() {
+ // Utility class.
+ }
+
+ /**
+ * Deflates a log entry and wraps it in a deflated entry.
+ * <p>
+ * This will encode the entry using the thrift binary codec, and will apply deflate compression to
+ * the resulting encoded data.
+ * <p>
+ * This operation is symmetric with {@link #inflate(LogEntry)}.
+ *
+ * @param entry Entry to deflate.
+ * @return An entry with the {@code deflatedEntry} field set with the deflated serialized value
+ * of the original entry.
+ * @throws CodingException If the value could not be encoded or deflated.
+ */
+ static LogEntry deflate(LogEntry entry) throws CodingException {
+ byte[] data = thriftBinaryEncode(entry);
+ int initialLength = data.length;
+ LOG.info("Deflating log entry of size " + initialLength);
+ ByteArrayOutputStream deflated = new ByteArrayOutputStream();
+ DeflaterOutputStream deflater = new DeflaterOutputStream(deflated);
+ try {
+ deflater.write(data);
+ deflater.flush();
+ deflater.close();
+ byte[] deflatedData = deflated.toByteArray();
+ int bytesSaved = initialLength - deflatedData.length;
+ if (bytesSaved < 0) {
+ LOG.warning("Deflated entry is larger than original by " + (bytesSaved * -1) + " bytes");
+ } else {
+ LOG.info("Deflated log entry size: " + deflatedData.length + " (saved " + bytesSaved + ")");
+ }
+
+ COMPRESSION_BYTES_SAVED.addAndGet(bytesSaved);
+ return LogEntry.deflatedEntry(ByteBuffer.wrap(deflatedData));
+ } catch (IOException e) {
+ throw new CodingException("Failed to deflate snapshot: " + e, e);
+ }
+ }
+
+ /**
+ * Inflates and deserializes a deflated log entry.
+ * <p>
+ * This requires that the {@code deflatedEntry} field is set on the provided {@code entry}.
+ * The encoded value will be inflated and deserialized as a {@link LogEntry}.
+ *
+ * @param entry Entry to inflate, which must be a deflated entry.
+ * @return The inflated entry.
+ * @throws CodingException If the value could not be inflated or decoded.
+ */
+ static LogEntry inflate(LogEntry entry) throws CodingException {
+ Preconditions.checkArgument(entry.isSet(_Fields.DEFLATED_ENTRY));
+
+ ByteArrayOutputStream inflated = new ByteArrayOutputStream();
+ ByteBuffer data = entry.bufferForDeflatedEntry();
+ LOG.info("Inflating deflated log entry of size " + data.remaining());
+ InflaterInputStream inflater = new InflaterInputStream(
+ new ByteArrayInputStream(data.array(), data.position(), data.remaining()));
+ try {
+ ByteStreams.copy(inflater, inflated);
+ byte[] inflatedData = inflated.toByteArray();
+ LOG.info("Inflated log entry size: " + inflatedData.length);
+ return thriftBinaryDecode(inflatedData);
+ } catch (IOException e) {
+ throw new CodingException("Failed to inflate compressed log entry.", e);
+ }
+ }
+
+ /**
+ * Thrift binary-encodes a log entry.
+ *
+ * @param entry The entry to encode.
+ * @return The serialized entry value.
+ * @throws CodingException If the entry could not be encoded.
+ */
+ static byte[] thriftBinaryEncode(LogEntry entry) throws CodingException {
+ return ThriftBinaryCodec.encodeNonNull(entry);
+ }
+
+ /**
+ * Decodes a byte array containing thrift binary-encoded data.
+ *
+ * @param contents The data to decode.
+ * @return The deserialized entry.
+ * @throws CodingException If the entry could not be deserialized.
+ */
+ static LogEntry thriftBinaryDecode(byte[] contents) throws CodingException {
+ return ThriftBinaryCodec.decodeNonNull(LogEntry.class, contents);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java
new file mode 100644
index 0000000..da29401
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogManager.java
@@ -0,0 +1,516 @@
+/*
+ * Copyright 2013 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.aurora.scheduler.storage.log;
+
+import java.io.IOException;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Bytes;
+import com.google.inject.BindingAnnotation;
+
+import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
+import com.twitter.aurora.gen.ScheduledTask;
+import com.twitter.aurora.gen.storage.Frame;
+import com.twitter.aurora.gen.storage.FrameChunk;
+import com.twitter.aurora.gen.storage.FrameHeader;
+import com.twitter.aurora.gen.storage.LogEntry;
+import com.twitter.aurora.gen.storage.LogEntry._Fields;
+import com.twitter.aurora.gen.storage.Op;
+import com.twitter.aurora.gen.storage.RemoveTasks;
+import com.twitter.aurora.gen.storage.SaveHostAttributes;
+import com.twitter.aurora.gen.storage.SaveTasks;
+import com.twitter.aurora.gen.storage.Snapshot;
+import com.twitter.aurora.gen.storage.Transaction;
+import com.twitter.aurora.gen.storage.storageConstants;
+import com.twitter.aurora.scheduler.log.Log;
+import com.twitter.aurora.scheduler.log.Log.Entry;
+import com.twitter.aurora.scheduler.log.Log.Position;
+import com.twitter.aurora.scheduler.log.Log.Stream;
+import com.twitter.aurora.scheduler.log.Log.Stream.InvalidPositionException;
+import com.twitter.aurora.scheduler.log.Log.Stream.StreamAccessException;
+import com.twitter.common.application.ShutdownRegistry;
+import com.twitter.common.base.Closure;
+import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.inject.TimedInterceptor.Timed;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Data;
+import com.twitter.common.stats.Stats;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Manages opening, reading from and writing to a {@link Log}.
+ */
+public final class LogManager {
+
+ /**
+ * Identifies the maximum log entry size to permit before chunking entries into frames.
+ */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target({ ElementType.PARAMETER, ElementType.METHOD })
+ @BindingAnnotation
+ public @interface MaxEntrySize { }
+
+ /**
+ * Binding annotation for settings regarding the way snapshots are written.
+ */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target({ ElementType.PARAMETER, ElementType.METHOD })
+ @BindingAnnotation
+ public @interface SnapshotSetting { }
+
+ private static final Logger LOG = Logger.getLogger(LogManager.class.getName());
+
+ private final Log log;
+ private final Amount<Integer, Data> maxEntrySize;
+ private final boolean deflateSnapshots;
+ private final ShutdownRegistry shutdownRegistry;
+
+ @Inject
+ LogManager(
+ Log log,
+ @MaxEntrySize Amount<Integer, Data> maxEntrySize,
+ @SnapshotSetting boolean deflateSnapshots,
+ ShutdownRegistry shutdownRegistry) {
+
+ this.log = checkNotNull(log);
+ this.maxEntrySize = checkNotNull(maxEntrySize);
+ this.deflateSnapshots = deflateSnapshots;
+ this.shutdownRegistry = checkNotNull(shutdownRegistry);
+ }
+
+ /**
+ * Opens the log for reading and writing.
+ *
+ * @return A stream manager that can be used to manipulate the log stream.
+ * @throws IOException If there is a problem opening the log.
+ */
+ public StreamManager open() throws IOException {
+ final Stream stream = log.open();
+ shutdownRegistry.addAction(new ExceptionalCommand<IOException>() {
+ @Override public void execute() throws IOException {
+ stream.close();
+ }
+ });
+ return new StreamManager(stream, deflateSnapshots, maxEntrySize);
+ }
+
+ /**
+ * Manages interaction with the log stream. Log entries can be
+ * {@link #readFromBeginning(com.twitter.common.base.Closure) read from} the beginning,
+ * a {@link #startTransaction() transaction} consisting of one or more local storage
+ * operations can be committed atomically, or the log can be compacted by
+ * {@link #snapshot(com.twitter.aurora.gen.storage.Snapshot) snapshotting}.
+ */
+ public static class StreamManager {
+
+ private static MessageDigest createDigest() {
+ try {
+ return MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException e) {
+ throw new IllegalStateException("Could not find provider for standard algorithm 'MD5'", e);
+ }
+ }
+
+ private static class Vars {
+ private final AtomicInteger unSnapshottedTransactions =
+ Stats.exportInt("scheduler_log_un_snapshotted_transactions");
+ private final AtomicLong bytesWritten = Stats.exportLong("scheduler_log_bytes_written");
+ private final AtomicLong entriesWritten = Stats.exportLong("scheduler_log_entries_written");
+ private final AtomicLong badFramesRead = Stats.exportLong("scheduler_log_bad_frames_read");
+ private final AtomicLong bytesRead = Stats.exportLong("scheduler_log_bytes_read");
+ private final AtomicLong entriesRead = Stats.exportLong("scheduler_log_entries_read");
+ private final AtomicLong deflatedEntriesRead =
+ Stats.exportLong("scheduler_log_deflated_entries_read");
+ private final AtomicLong snapshots = Stats.exportLong("scheduler_log_snapshots");
+ }
+ private final Vars vars = new Vars();
+
+ private final Object writeMutex = new Object();
+ private final Stream stream;
+ private final boolean deflateSnapshots;
+ private final MessageDigest digest;
+ private final EntrySerializer entrySerializer;
+
+ StreamManager(Stream stream, boolean deflateSnapshots, Amount<Integer, Data> maxEntrySize) {
+ this.stream = checkNotNull(stream);
+ this.deflateSnapshots = deflateSnapshots;
+ digest = createDigest();
+ entrySerializer = new EntrySerializer(digest, maxEntrySize);
+ }
+
+ /**
+ * Reads all entries in the log stream after the given position. If the position
+ * supplied is {@code null} then all log entries in the stream will be read.
+ *
+ * @param reader A reader that will be handed log entries decoded from the stream.
+ * @throws CodingException if there was a problem decoding a log entry from the stream.
+ * @throws InvalidPositionException if the given position is not found in the log.
+ * @throws StreamAccessException if there is a problem reading from the log.
+ */
+ public void readFromBeginning(Closure<LogEntry> reader)
+ throws CodingException, InvalidPositionException, StreamAccessException {
+
+ Iterator<Entry> entries = stream.readAll();
+
+ while (entries.hasNext()) {
+ LogEntry logEntry = decodeLogEntry(entries.next());
+ while (logEntry != null && isFrame(logEntry)) {
+ logEntry = tryDecodeFrame(logEntry.getFrame(), entries);
+ }
+ if (logEntry != null) {
+ if (logEntry.isSet(_Fields.DEFLATED_ENTRY)) {
+ logEntry = Entries.inflate(logEntry);
+ vars.deflatedEntriesRead.incrementAndGet();
+ }
+
+ reader.execute(logEntry);
+ vars.entriesRead.incrementAndGet();
+ }
+ }
+ }
+
+ @Nullable
+ private LogEntry tryDecodeFrame(Frame frame, Iterator<Entry> entries) throws CodingException {
+ if (!isHeader(frame)) {
+ LOG.warning("Found a frame with no preceding header, skipping.");
+ return null;
+ }
+ FrameHeader header = frame.getHeader();
+ byte[][] chunks = new byte[header.chunkCount][];
+
+ digest.reset();
+ for (int i = 0; i < header.chunkCount; i++) {
+ if (!entries.hasNext()) {
+ logBadFrame(header, i);
+ return null;
+ }
+ LogEntry logEntry = decodeLogEntry(entries.next());
+ if (!isFrame(logEntry)) {
+ logBadFrame(header, i);
+ return logEntry;
+ }
+ Frame chunkFrame = logEntry.getFrame();
+ if (!isChunk(chunkFrame)) {
+ logBadFrame(header, i);
+ return logEntry;
+ }
+ byte[] chunkData = chunkFrame.getChunk().getData();
+ digest.update(chunkData);
+ chunks[i] = chunkData;
+ }
+ if (!Arrays.equals(header.getChecksum(), digest.digest())) {
+ throw new CodingException("Read back a framed log entry that failed its checksum");
+ }
+ return Entries.thriftBinaryDecode(Bytes.concat(chunks));
+ }
+
+ private static boolean isFrame(LogEntry logEntry) {
+ return logEntry.getSetField() == LogEntry._Fields.FRAME;
+ }
+
+ private static boolean isChunk(Frame frame) {
+ return frame.getSetField() == Frame._Fields.CHUNK;
+ }
+
+ private static boolean isHeader(Frame frame) {
+ return frame.getSetField() == Frame._Fields.HEADER;
+ }
+
+ private void logBadFrame(FrameHeader header, int chunkIndex) {
+ LOG.info(String.format("Found an aborted transaction, required %d frames and found %d",
+ header.chunkCount, chunkIndex));
+ vars.badFramesRead.incrementAndGet();
+ }
+
+ private LogEntry decodeLogEntry(Entry entry) throws CodingException {
+ byte[] contents = entry.contents();
+ vars.bytesRead.addAndGet(contents.length);
+ return Entries.thriftBinaryDecode(contents);
+ }
+
+ /**
+ * Truncates all entries in the log stream occuring before the given position. The entry at the
+ * given position becomes the first entry in the stream when this call completes.
+ *
+ * @param position The last position to keep in the stream.
+ * @throws InvalidPositionException if the specified position does not exist in this log.
+ * @throws StreamAccessException if the stream could not be truncated.
+ */
+ void truncateBefore(Position position) {
+ stream.truncateBefore(position);
+ }
+
+ /**
+ * Starts a transaction that can be used to commit a series of {@link Op}s to the log stream
+ * atomically.
+ *
+ * @return StreamTransaction A transaction manager to handle batching up commits to the
+ * underlying stream.
+ */
+ StreamTransaction startTransaction() {
+ return new StreamTransaction();
+ }
+
+ /**
+ * Adds a snapshot to the log and if successful, truncates the log entries preceding the
+ * snapshot.
+ *
+ * @param snapshot The snapshot to add.
+ * @throws CodingException if the was a problem encoding the snapshot into a log entry.
+ * @throws InvalidPositionException if there was a problem truncating before the snapshot.
+ * @throws StreamAccessException if there was a problem appending the snapshot to the log.
+ */
+ @Timed("log_manager_snapshot")
+ void snapshot(Snapshot snapshot)
+ throws CodingException, InvalidPositionException, StreamAccessException {
+
+ LogEntry entry = LogEntry.snapshot(snapshot);
+ if (deflateSnapshots) {
+ entry = Entries.deflate(entry);
+ }
+
+ Position position = appendAndGetPosition(entry);
+ vars.snapshots.incrementAndGet();
+ vars.unSnapshottedTransactions.set(0);
+ stream.truncateBefore(position);
+ }
+
+ @Timed("log_manager_append")
+ private Position appendAndGetPosition(LogEntry logEntry) throws CodingException {
+ Position firstPosition = null;
+ byte[][] entries = entrySerializer.serialize(logEntry);
+ synchronized (writeMutex) { // ensure all sub-entries are written as a unit
+ for (byte[] entry : entries) {
+ Position position = stream.append(entry);
+ if (firstPosition == null) {
+ firstPosition = position;
+ }
+ vars.bytesWritten.addAndGet(entry.length);
+ }
+ }
+ vars.entriesWritten.incrementAndGet();
+ return firstPosition;
+ }
+
+ @VisibleForTesting
+ public static class EntrySerializer {
+ private final MessageDigest digest;
+ private final int maxEntrySizeBytes;
+
+ private EntrySerializer(MessageDigest digest, Amount<Integer, Data> maxEntrySize) {
+ this.digest = checkNotNull(digest);
+ maxEntrySizeBytes = maxEntrySize.as(Data.BYTES);
+ }
+
+ public EntrySerializer(Amount<Integer, Data> maxEntrySize) {
+ this(createDigest(), maxEntrySize);
+ }
+
+ /**
+ * Serializes a log entry and splits it into chunks no larger than {@code maxEntrySizeBytes}.
+ *
+ * @param logEntry The log entry to serialize.
+ * @return Serialized and chunked log entry.
+ * @throws CodingException If the entry could not be serialized.
+ */
+ @VisibleForTesting
+ public byte[][] serialize(LogEntry logEntry) throws CodingException {
+ byte[] entry = Entries.thriftBinaryEncode(logEntry);
+ if (entry.length <= maxEntrySizeBytes) {
+ return new byte[][] {entry};
+ }
+
+ int chunks = (int) Math.ceil(entry.length / (double) maxEntrySizeBytes);
+ byte[][] frames = new byte[chunks + 1][];
+
+ frames[0] = encode(Frame.header(new FrameHeader(chunks, ByteBuffer.wrap(checksum(entry)))));
+ for (int i = 0; i < chunks; i++) {
+ int offset = i * maxEntrySizeBytes;
+ ByteBuffer chunk =
+ ByteBuffer.wrap(entry, offset, Math.min(maxEntrySizeBytes, entry.length - offset));
+ frames[i + 1] = encode(Frame.chunk(new FrameChunk(chunk)));
+ }
+ return frames;
+ }
+
+ private byte[] checksum(byte[] data) {
+ digest.reset();
+ return digest.digest(data);
+ }
+
+ private static byte[] encode(Frame frame) throws CodingException {
+ return Entries.thriftBinaryEncode(LogEntry.frame(frame));
+ }
+ }
+
+ /**
+ * Manages a single log stream append transaction. Local storage ops can be added to the
+ * transaction and then later committed as an atomic unit.
+ */
+ final class StreamTransaction {
+ private final Transaction transaction =
+ new Transaction().setSchemaVersion(storageConstants.CURRENT_SCHEMA_VERSION);
+ private final AtomicBoolean committed = new AtomicBoolean(false);
+
+ private StreamTransaction() {
+ // supplied by factory method
+ }
+
+ /**
+ * Appends any ops that have been added to this transaction to the log stream in a single
+ * atomic record.
+ *
+ * @return The position of the log entry committed in this transaction, if any.
+ * @throws CodingException If there was a problem encoding a log entry for commit.
+ */
+ Position commit() throws CodingException {
+ Preconditions.checkState(!committed.getAndSet(true),
+ "Can only call commit once per transaction.");
+
+ if (!transaction.isSetOps()) {
+ return null;
+ }
+
+ Position position = appendAndGetPosition(LogEntry.transaction(transaction));
+ vars.unSnapshottedTransactions.incrementAndGet();
+ return position;
+ }
+
+ /**
+ * Adds a local storage operation to this transaction.
+ *
+ * @param op The local storage op to add.
+ */
+ void add(Op op) {
+ Preconditions.checkState(!committed.get());
+
+ Op prior = transaction.isSetOps() ? Iterables.getLast(transaction.getOps(), null) : null;
+ if (prior == null || !coalesce(prior, op)) {
+ transaction.addToOps(op);
+ }
+ }
+
+ /**
+ * Tries to coalesce a new op into the prior to compact the binary representation and increase
+ * batching.
+ *
+ * <p>Its recommended that as new {@code Op}s are added, they be treated here although they
+ * need not be</p>
+ *
+ * @param prior The previous op.
+ * @param next The next op to be added.
+ * @return {@code true} if the next op was coalesced into the prior, {@code false} otherwise.
+ */
+ private boolean coalesce(Op prior, Op next) {
+ if (!prior.isSet() && !next.isSet()) {
+ return false;
+ }
+
+ Op._Fields priorType = prior.getSetField();
+ if (!priorType.equals(next.getSetField())) {
+ return false;
+ }
+
+ switch (priorType) {
+ case SAVE_FRAMEWORK_ID:
+ prior.setSaveFrameworkId(next.getSaveFrameworkId());
+ return true;
+
+ case SAVE_ACCEPTED_JOB:
+ case REMOVE_JOB:
+ case SAVE_QUOTA:
+ case REMOVE_QUOTA:
+ return false;
+
+ case SAVE_TASKS:
+ coalesce(prior.getSaveTasks(), next.getSaveTasks());
+ return true;
+ case REMOVE_TASKS:
+ coalesce(prior.getRemoveTasks(), next.getRemoveTasks());
+ return true;
+ case SAVE_HOST_ATTRIBUTES:
+ return coalesce(prior.getSaveHostAttributes(), next.getSaveHostAttributes());
+ default:
+ LOG.warning("Unoptimized op: " + priorType);
+ return false;
+ }
+ }
+
+ private void coalesce(SaveTasks prior, SaveTasks next) {
+ if (next.isSetTasks()) {
+ if (prior.isSetTasks()) {
+ // It is an expected invariant that an operation may reference a task (identified by
+ // task ID) no more than one time. Therefore, to coalesce two SaveTasks operations,
+ // the most recent task definition overrides the prior operation.
+ Map<String, ScheduledTask> coalesced = Maps.newHashMap();
+ for (ScheduledTask task : prior.getTasks()) {
+ coalesced.put(task.getAssignedTask().getTaskId(), task);
+ }
+ for (ScheduledTask task : next.getTasks()) {
+ coalesced.put(task.getAssignedTask().getTaskId(), task);
+ }
+ prior.setTasks(ImmutableSet.copyOf(coalesced.values()));
+ } else {
+ prior.setTasks(next.getTasks());
+ }
+ }
+ }
+
+ private void coalesce(RemoveTasks prior, RemoveTasks next) {
+ if (next.isSetTaskIds()) {
+ if (prior.isSetTaskIds()) {
+ prior.setTaskIds(ImmutableSet.<String>builder()
+ .addAll(prior.getTaskIds())
+ .addAll(next.getTaskIds())
+ .build());
+ } else {
+ prior.setTaskIds(next.getTaskIds());
+ }
+ }
+ }
+
+ private boolean coalesce(SaveHostAttributes prior, SaveHostAttributes next) {
+ if (prior.getHostAttributes().getHost().equals(next.getHostAttributes().getHost())) {
+ prior.getHostAttributes().setAttributes(next.getHostAttributes().getAttributes());
+ return true;
+ }
+ return false;
+ }
+ }
+ }
+}