You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2013/12/31 22:20:35 UTC
[42/51] [partial] Rename twitter* and com.twitter to apache and
org.apache directories to preserve all file history before the refactor.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/stats/SlotSizeCounter.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/stats/SlotSizeCounter.java b/src/main/java/com/twitter/aurora/scheduler/stats/SlotSizeCounter.java
deleted file mode 100644
index 6c0bf48..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/stats/SlotSizeCounter.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.stats;
-
-import java.util.Map;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-
-import com.twitter.aurora.gen.Quota;
-import com.twitter.aurora.scheduler.quota.Quotas;
-import com.twitter.aurora.scheduler.storage.entities.IQuota;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A stat computer that aggregates the number of 'slots' available at different pre-determined
- * slot sizes.
- */
-class SlotSizeCounter implements Runnable {
- private static final Map<String, IQuota> SLOT_SIZES = ImmutableMap.of(
- "small", IQuota.build(new Quota(1.0, 1024, 4096)),
- "medium", IQuota.build(new Quota(4.0, 8192, 16384)),
- "large", IQuota.build(new Quota(8.0, 16384, 32768)),
- "xlarge", IQuota.build(new Quota(16.0, 32768, 65536)));
-
- private final Map<String, IQuota> slotSizes;
- private final ResourceSlotProvider resourceSlotProvider;
- private final CachedCounters cachedCounters;
-
- @VisibleForTesting
- SlotSizeCounter(
- final Map<String, IQuota> slotSizes,
- ResourceSlotProvider resourceSlotProvider,
- CachedCounters cachedCounters) {
-
- this.slotSizes = checkNotNull(slotSizes);
- this.resourceSlotProvider = checkNotNull(resourceSlotProvider);
- this.cachedCounters = checkNotNull(cachedCounters);
- }
-
- interface ResourceSlotProvider {
- Iterable<IQuota> get();
- }
-
- @Inject
- SlotSizeCounter(ResourceSlotProvider resourceSlotProvider, CachedCounters cachedCounters) {
- this(SLOT_SIZES, resourceSlotProvider, cachedCounters);
- }
-
- @VisibleForTesting
- static String getStatName(String slotName) {
- return "empty_slots_" + slotName;
- }
-
- private int countSlots(Iterable<IQuota> slots, final IQuota slotSize) {
- Function<IQuota, Integer> counter = new Function<IQuota, Integer>() {
- @Override public Integer apply(IQuota machineSlack) {
- return Quotas.divide(machineSlack, slotSize);
- }
- };
-
- int sum = 0;
- for (int slotCount : FluentIterable.from(slots).transform(counter)) {
- sum += slotCount;
- }
- return sum;
- }
-
- @Override
- public void run() {
- Iterable<IQuota> slots = resourceSlotProvider.get();
- for (Map.Entry<String, IQuota> entry : slotSizes.entrySet()) {
- cachedCounters.get(getStatName(entry.getKey())).set(countSlots(slots, entry.getValue()));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/stats/TaskStatCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/stats/TaskStatCalculator.java b/src/main/java/com/twitter/aurora/scheduler/stats/TaskStatCalculator.java
deleted file mode 100644
index db34125..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/stats/TaskStatCalculator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package com.twitter.aurora.scheduler.stats;
-
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.twitter.aurora.scheduler.stats.AsyncStatsModule.StatUpdater;
-import com.twitter.aurora.scheduler.stats.ResourceCounter.GlobalMetric;
-import com.twitter.aurora.scheduler.stats.ResourceCounter.Metric;
-import com.twitter.aurora.scheduler.storage.Storage.StorageException;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Calculates and exports aggregate stats about resources consumed by active tasks.
- */
-class TaskStatCalculator implements Runnable {
- private static final Logger LOG = Logger.getLogger(StatUpdater.class.getName());
-
- private final CachedCounters counters;
- private final ResourceCounter resourceCounter;
-
- @Inject
- TaskStatCalculator(ResourceCounter resourceCounter, CachedCounters counters) {
- this.resourceCounter = checkNotNull(resourceCounter);
- this.counters = checkNotNull(counters);
- }
-
- private void update(String prefix, Metric metric) {
- counters.get(prefix + "_cpu").set(metric.getCpu());
- counters.get(prefix + "_ram_gb").set(metric.getRamGb());
- counters.get(prefix + "_disk_gb").set(metric.getDiskGb());
- }
-
- @Override
- public void run() {
- try {
- for (GlobalMetric metric : resourceCounter.computeConsumptionTotals()) {
- update("resources_" + metric.type.name().toLowerCase(), metric);
- }
- update("resources_allocated_quota", resourceCounter.computeQuotaAllocationTotals());
- } catch (StorageException e) {
- LOG.fine("Unable to fetch metrics, storage is likely not ready.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/AttributeStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/AttributeStore.java b/src/main/java/com/twitter/aurora/scheduler/storage/AttributeStore.java
deleted file mode 100644
index 35c666c..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/AttributeStore.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage;
-
-import java.util.Set;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-
-import com.twitter.aurora.gen.Attribute;
-import com.twitter.aurora.gen.HostAttributes;
-import com.twitter.aurora.gen.MaintenanceMode;
-import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
-
-/**
- * Storage interface for host attributes.
- */
-public interface AttributeStore {
- /**
- * Fetches all host attributes given by the host.
- *
- * @param host host name.
- * @return attributes associated with {@code host}, if the host is known.
- */
- Optional<HostAttributes> getHostAttributes(String host);
-
- /**
- * Fetches all attributes in the store.
- *
- * @return All host attributes.
- */
- Set<HostAttributes> getHostAttributes();
-
- /**
- * Attributes are considered mostly ephemeral and extremely low risk when inconsistency
- * is present.
- */
- public interface Mutable extends AttributeStore {
-
- /**
- * Deletes all attributes in the store.
- */
- void deleteHostAttributes();
-
- /**
- * Save a host attribute in the attribute store.
- *
- * @param hostAttributes The attribute we are going to save.
- */
- void saveHostAttributes(HostAttributes hostAttributes);
-
- /**
- * Adjusts the maintenance mode for a host.
- * No adjustment will be made if the host is unknown.
- *
- * @param host Host to adjust.
- * @param mode Mode to place the host in.
- * @return {@code true} if the host is known and the state was adjusted,
- * {@code false} if the host is unrecognized.
- */
- boolean setMaintenanceMode(String host, MaintenanceMode mode);
- }
-
- public static final class Util {
- private Util() {
- }
-
- /**
- * Fetches attributes about a {@code host}.
- *
- * @param store Store to fetch host attributes from.
- * @param host Host to fetch attributes about.
- * @return Attributes associated with {@code host}, or an empty iterable if the host is
- * unknown.
- */
- public static Iterable<Attribute> attributesOrNone(StoreProvider store, String host) {
- Optional<HostAttributes> attributes = store.getAttributeStore().getHostAttributes(host);
- return attributes.isPresent()
- ? attributes.get().getAttributes() : ImmutableList.<Attribute>of();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/CallOrderEnforcingStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/CallOrderEnforcingStorage.java b/src/main/java/com/twitter/aurora/scheduler/storage/CallOrderEnforcingStorage.java
deleted file mode 100644
index c71a114..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/CallOrderEnforcingStorage.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-import javax.inject.Inject;
-import javax.inject.Singleton;
-
-import com.google.inject.BindingAnnotation;
-import com.google.inject.Module;
-import com.google.inject.PrivateModule;
-
-import com.twitter.aurora.scheduler.events.PubsubEvent.Interceptors.Event;
-import com.twitter.aurora.scheduler.events.PubsubEvent.Interceptors.SendNotification;
-import com.twitter.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
-import com.twitter.aurora.scheduler.storage.Storage.NonVolatileStorage;
-import com.twitter.common.util.StateMachine;
-
-/**
- * A non-volatile storage wrapper that enforces method call ordering.
- */
-public class CallOrderEnforcingStorage implements NonVolatileStorage {
-
- /**
- * Identifies a storage whose call order should be enforced.
- */
- @Retention(RetentionPolicy.RUNTIME)
- @Target({ ElementType.PARAMETER, ElementType.METHOD })
- @BindingAnnotation
- private @interface EnforceOrderOn { }
-
- private final NonVolatileStorage wrapped;
-
- private enum State {
- CONSTRUCTED,
- PREPARED,
- READY,
- STOPPED
- }
-
- private final StateMachine<State> stateMachine = StateMachine.<State>builder("storage")
- .logTransitions()
- .initialState(State.CONSTRUCTED)
- .addState(State.CONSTRUCTED, State.PREPARED)
- .addState(State.PREPARED, State.READY)
- .addState(State.READY, State.STOPPED)
- .build();
-
- @Inject
- CallOrderEnforcingStorage(@EnforceOrderOn NonVolatileStorage wrapped) {
- this.wrapped = wrapped;
- }
-
- private void checkInState(State state) throws StorageException {
- if (stateMachine.getState() != state) {
- throw new StorageException("Storage is not " + state);
- }
- }
-
- @Override
- public void prepare() throws StorageException {
- checkInState(State.CONSTRUCTED);
- wrapped.prepare();
- stateMachine.transition(State.PREPARED);
- }
-
- @SendNotification(after = Event.StorageStarted)
- @Override
- public void start(Quiet initializationLogic) throws StorageException {
- checkInState(State.PREPARED);
- wrapped.start(initializationLogic);
- stateMachine.transition(State.READY);
- }
-
- @Override
- public void stop() {
- wrapped.stop();
- stateMachine.transition(State.STOPPED);
- }
-
- @Override
- public <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
- checkInState(State.READY);
- return wrapped.consistentRead(work);
- }
-
- @Override
- public <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work)
- throws StorageException, E {
-
- checkInState(State.READY);
- return wrapped.weaklyConsistentRead(work);
- }
-
- @Override
- public <T, E extends Exception> T write(MutateWork<T, E> work)
- throws StorageException, E {
- checkInState(State.READY);
- return wrapped.write(work);
- }
-
- @Override
- public void snapshot() throws StorageException {
- checkInState(State.READY);
- wrapped.snapshot();
- }
-
- /**
- * Creates a binding module that will wrap a storage class with {@link CallOrderEnforcingStorage},
- * exposing the order-enforced storage as {@link Storage} and {@link NonVolatileStorage}.
- *
- * @param storageClass Non-volatile storage implementation class.
- * @return Binding module.
- */
- public static Module wrappingModule(final Class<? extends NonVolatileStorage> storageClass) {
- return new PrivateModule() {
- @Override protected void configure() {
- bind(Storage.class).to(CallOrderEnforcingStorage.class);
- bind(NonVolatileStorage.class).to(CallOrderEnforcingStorage.class);
- bind(CallOrderEnforcingStorage.class).in(Singleton.class);
- bind(NonVolatileStorage.class).annotatedWith(EnforceOrderOn.class).to(storageClass);
- expose(Storage.class);
- expose(NonVolatileStorage.class);
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/DistributedSnapshotStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/DistributedSnapshotStore.java b/src/main/java/com/twitter/aurora/scheduler/storage/DistributedSnapshotStore.java
deleted file mode 100644
index cba6303..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/DistributedSnapshotStore.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage;
-
-import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
-import com.twitter.aurora.gen.storage.Snapshot;
-
-/**
- * A distributed snapshot store that supports persisting globally-visible snapshots.
- */
-public interface DistributedSnapshotStore {
- /**
- * Writes a snapshot to the distributed storage system.
- * TODO(William Farner): Currently we're hiding some exceptions (which happen to be
- * RuntimeExceptions). Clean these up to be checked, and throw another exception type here.
- *
- * @param snapshot Snapshot to write.
- * @throws CodingException If the snapshot could not be serialized.
- */
- void persist(Snapshot snapshot) throws CodingException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/ForwardingStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/ForwardingStore.java b/src/main/java/com/twitter/aurora/scheduler/storage/ForwardingStore.java
deleted file mode 100644
index 1a6a849..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/ForwardingStore.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage;
-
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-
-import com.twitter.aurora.gen.HostAttributes;
-import com.twitter.aurora.gen.MaintenanceMode;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-import com.twitter.aurora.scheduler.storage.entities.ILock;
-import com.twitter.aurora.scheduler.storage.entities.ILockKey;
-import com.twitter.aurora.scheduler.storage.entities.IQuota;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A store that forwards all its operations to underlying storage systems. Useful for decorating
- * an existing storage system.
- */
-public class ForwardingStore implements
- Storage,
- SchedulerStore.Mutable,
- JobStore.Mutable,
- TaskStore.Mutable,
- LockStore.Mutable,
- QuotaStore.Mutable,
- AttributeStore.Mutable {
-
- private final Storage storage;
- private final SchedulerStore.Mutable schedulerStore;
- private final JobStore.Mutable jobStore;
- private final TaskStore.Mutable taskStore;
- private final LockStore.Mutable lockStore;
- private final QuotaStore.Mutable quotaStore;
- private final AttributeStore.Mutable attributeStore;
-
- /**
- * Creats a new forwarding store that delegates to the providing default stores.
- *
- * @param storage Delegate.
- * @param schedulerStore Delegate.
- * @param jobStore Delegate.
- * @param taskStore Delegate.
- * @param lockStore Delegate.
- * @param quotaStore Delegate.
- * @param attributeStore Delegate.
- */
- public ForwardingStore(
- Storage storage,
- SchedulerStore.Mutable schedulerStore,
- JobStore.Mutable jobStore,
- TaskStore.Mutable taskStore,
- LockStore.Mutable lockStore,
- QuotaStore.Mutable quotaStore,
- AttributeStore.Mutable attributeStore) {
-
- this.storage = checkNotNull(storage);
- this.schedulerStore = checkNotNull(schedulerStore);
- this.jobStore = checkNotNull(jobStore);
- this.taskStore = checkNotNull(taskStore);
- this.lockStore = checkNotNull(lockStore);
- this.quotaStore = checkNotNull(quotaStore);
- this.attributeStore = checkNotNull(attributeStore);
- }
-
- @Override
- public <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E {
- return storage.consistentRead(work);
- }
-
- @Override
- public <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work)
- throws StorageException, E {
- return storage.weaklyConsistentRead(work);
- }
-
- @Override
- public <T, E extends Exception> T write(MutateWork<T, E> work)
- throws StorageException, E {
-
- return storage.write(work);
- }
-
- @Override
- public void snapshot() throws StorageException {
- storage.snapshot();
- }
-
- @Override
- public void saveFrameworkId(String frameworkId) {
- schedulerStore.saveFrameworkId(frameworkId);
- }
-
- @Override
- @Nullable
- public String fetchFrameworkId() {
- return schedulerStore.fetchFrameworkId();
- }
-
- @Override
- public Iterable<IJobConfiguration> fetchJobs(String managerId) {
- return jobStore.fetchJobs(managerId);
- }
-
- @Override
- public Optional<IJobConfiguration> fetchJob(String managerId, IJobKey jobKey) {
- return jobStore.fetchJob(managerId, jobKey);
- }
-
- @Override
- public void saveAcceptedJob(String managerId, IJobConfiguration jobConfig) {
- jobStore.saveAcceptedJob(managerId, jobConfig);
- }
-
- @Override
- public void removeJob(IJobKey jobKey) {
- jobStore.removeJob(jobKey);
- }
-
- @Override
- public void deleteJobs() {
- jobStore.deleteJobs();
- }
-
- @Override
- public Set<String> fetchManagerIds() {
- return jobStore.fetchManagerIds();
- }
-
- @Override
- public void saveTasks(Set<IScheduledTask> tasks) throws IllegalStateException {
- taskStore.saveTasks(tasks);
- }
-
- @Override
- public void deleteAllTasks() {
- taskStore.deleteAllTasks();
- }
-
- @Override
- public void deleteTasks(Set<String> taskIds) {
- taskStore.deleteTasks(taskIds);
- }
-
- @Override
- public ImmutableSet<IScheduledTask> mutateTasks(
- Query.Builder query,
- Function<IScheduledTask, IScheduledTask> mutator) {
-
- return taskStore.mutateTasks(query, mutator);
- }
-
- @Override
- public boolean unsafeModifyInPlace(String taskId, ITaskConfig taskConfiguration) {
- return taskStore.unsafeModifyInPlace(taskId, taskConfiguration);
- }
-
- @Override
- public ImmutableSet<IScheduledTask> fetchTasks(Query.Builder querySupplier) {
- return taskStore.fetchTasks(querySupplier);
- }
-
- @Override
- public Set<ILock> fetchLocks() {
- return lockStore.fetchLocks();
- }
-
- @Override
- public Optional<ILock> fetchLock(ILockKey lockKey) {
- return lockStore.fetchLock(lockKey);
- }
-
- @Override
- public void saveLock(ILock lock) {
- lockStore.saveLock(lock);
- }
-
- @Override
- public void removeLock(ILockKey lockKey) {
- lockStore.removeLock(lockKey);
- }
-
- @Override
- public void deleteLocks() {
- lockStore.deleteLocks();
- }
-
- @Override
- public Map<String, IQuota> fetchQuotas() {
- return quotaStore.fetchQuotas();
- }
-
- @Override
- public void removeQuota(String role) {
- quotaStore.removeQuota(role);
- }
-
- @Override
- public void deleteQuotas() {
- quotaStore.deleteQuotas();
- }
-
- @Override
- public void saveQuota(String role, IQuota quota) {
- quotaStore.saveQuota(role, quota);
- }
-
- @Override
- public Optional<IQuota> fetchQuota(String role) {
- return quotaStore.fetchQuota(role);
- }
-
- @Override
- public void saveHostAttributes(HostAttributes hostAttribute) {
- attributeStore.saveHostAttributes(hostAttribute);
- }
-
- @Override
- public Optional<HostAttributes> getHostAttributes(String host) {
- return attributeStore.getHostAttributes(host);
- }
-
- @Override
- public Set<HostAttributes> getHostAttributes() {
- return attributeStore.getHostAttributes();
- }
-
- @Override
- public void deleteHostAttributes() {
- attributeStore.deleteHostAttributes();
- }
-
- @Override
- public boolean setMaintenanceMode(String host, MaintenanceMode mode) {
- return attributeStore.setMaintenanceMode(host, mode);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/JobStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/JobStore.java b/src/main/java/com/twitter/aurora/scheduler/storage/JobStore.java
deleted file mode 100644
index 7092e91..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/JobStore.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage;
-
-import java.util.Set;
-
-import com.google.common.base.Optional;
-
-import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
-import com.twitter.aurora.scheduler.storage.entities.IJobKey;
-
-/**
- * Stores job configuration data.
- */
-public interface JobStore {
-
- /**
- * Fetches all {@code JobConfiguration}s for jobs owned by the manager identified by
- * {@code managerId}; if there are none then an empty set is returned.
- *
- * @param managerId The unique identifier of the manager to find registered jobs for.
- * @return the set of job configurations owned by the specififed job manager
- */
- Iterable<IJobConfiguration> fetchJobs(String managerId);
-
- /**
- * Fetches the {@code JobConfiguration} for the specified {@code jobKey} if it exists.
- *
- * @param managerId The unique identifier of the manager that accepted the job.
- * @param jobKey The jobKey identifying the job to be fetched.
- * @return the job configuration for the given {@code jobKey} or absent if none is found.
- */
- Optional<IJobConfiguration> fetchJob(String managerId, IJobKey jobKey);
-
- /**
- * Fetches all the unique manager ids that are present in the job store.
- *
- * @return The IDs of all stored job managers.
- */
- Set<String> fetchManagerIds();
-
- public interface Mutable extends JobStore {
- /**
- * Saves the job configuration for a job that has been accepted by the scheduler. Acts as an
- * update if the managerId already exists.
- * TODO(William Farner): Consider accepting SanitizedConfiguration here to require that
- * validation always happens for things entering storage.
- *
- * @param managerId The unique id of the manager that accepted the job.
- * @param jobConfig The configuration of the accepted job.
- */
- void saveAcceptedJob(String managerId, IJobConfiguration jobConfig);
-
- /**
- * Removes the job configuration for the job identified by {@code jobKey}.
- * If there is no stored configuration for the identified job, this method returns silently.
- *
- * @param jobKey the key identifying the job to delete.
- */
- void removeJob(IJobKey jobKey);
-
- /**
- * Deletes all jobs.
- */
- void deleteJobs();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/LockStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/LockStore.java b/src/main/java/com/twitter/aurora/scheduler/storage/LockStore.java
deleted file mode 100644
index 13d7317..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/LockStore.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage;
-
-import java.util.Set;
-
-import com.google.common.base.Optional;
-
-import com.twitter.aurora.scheduler.storage.entities.ILock;
-import com.twitter.aurora.scheduler.storage.entities.ILockKey;
-
-/**
- * Stores all lock-related data and defines methods for saving, deleting and fetching locks.
- */
-public interface LockStore {
- /**
- * Fetches all locks available in the store.
- *
- * @return All locks in the store.
- */
- Set<ILock> fetchLocks();
-
- /**
- * Fetches a lock by its key.
- *
- * @param lockKey Key of the lock to fetch.
- * @return Optional lock.
- */
- Optional<ILock> fetchLock(ILockKey lockKey);
-
- public interface Mutable extends LockStore {
- /**
- * Saves a new lock or overwrites the existing one with same LockKey.
- *
- * @param lock ILock to save.
- */
- void saveLock(ILock lock);
-
- /**
- * Removes the lock from the store.
- *
- * @param lockKey Key of the lock to remove.
- */
- void removeLock(ILockKey lockKey);
-
- /**
- * Deletes all locks from the store.
- */
- void deleteLocks();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/QuotaStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/QuotaStore.java b/src/main/java/com/twitter/aurora/scheduler/storage/QuotaStore.java
deleted file mode 100644
index ba8116b..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/QuotaStore.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage;
-
-import java.util.Map;
-
-import com.google.common.base.Optional;
-
-import com.twitter.aurora.scheduler.storage.entities.IQuota;
-
-/**
- * Point of storage for quota records.
- */
-public interface QuotaStore {
- /**
- * Fetches the existing quota record for a role.
- *
- * @param role Role to fetch quota for.
- * @return Optional quota associated with {@code role}.
- */
- Optional<IQuota> fetchQuota(String role);
-
- /**
- * Fetches all allocated quotas.
- *
- * @return All allocated quotas.
- */
- Map<String, IQuota> fetchQuotas();
-
- public interface Mutable extends QuotaStore {
-
- /**
- * Deletes all quotas.
- */
- void deleteQuotas();
-
- /**
- * Deletes quota for a role.
- *
- * @param role Role to remove quota record for.
- */
- void removeQuota(String role);
-
- /**
- * Saves a quota record for a role.
- *
- * @param role Role to create or update a quota record for.
- * @param quota Quota to save.
- */
- void saveQuota(String role, IQuota quota);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/ReadWriteLockManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/ReadWriteLockManager.java b/src/main/java/com/twitter/aurora/scheduler/storage/ReadWriteLockManager.java
deleted file mode 100644
index 58d7a68..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/ReadWriteLockManager.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage;
-
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.google.common.base.Preconditions;
-
-/**
- * A lock manager that wraps a ReadWriteLock and detects ill-fated attempts to upgrade
- * a read-locked thread to a write-locked thread, which would otherwise deadlock.
- */
-public class ReadWriteLockManager {
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- enum LockMode {
- NONE,
- READ,
- WRITE
- }
-
- private static class LockState {
- private LockMode initialLockMode = LockMode.NONE;
- private int lockCount = 0;
-
- private boolean lockAcquired(LockMode mode) {
- boolean stateChanged = false;
- if (initialLockMode == LockMode.NONE) {
- initialLockMode = mode;
- stateChanged = true;
- }
- if (initialLockMode == mode) {
- lockCount++;
- }
- return stateChanged;
- }
-
- private void lockReleased(LockMode mode) {
- if (initialLockMode == mode) {
- lockCount--;
- if (lockCount == 0) {
- initialLockMode = LockMode.NONE;
- }
- }
- }
- }
-
- private final ThreadLocal<LockState> lockState = new ThreadLocal<LockState>() {
- @Override protected LockState initialValue() {
- return new LockState();
- }
- };
-
- /**
- * Blocks until this thread has acquired a read lock.
- *
- * @return {@code true} if the lock was newly-acquired, or {@code false} if this thread previously
- * secured the write lock and has yet to release it.
- */
- public boolean readLock() {
- lock.readLock().lock();
- return lockState.get().lockAcquired(LockMode.READ);
- }
-
- /**
- * Releases this thread's read lock.
- */
- public void readUnlock() {
- lock.readLock().unlock();
- lockState.get().lockReleased(LockMode.READ);
- }
-
- /**
- * Blocks until this thread has acquired a write lock.
- *
- * @return {@code true} if the lock was newly-acquired, or {@code false} if this thread previously
- * secured the write lock and has yet to release it.
- */
- public boolean writeLock() {
- Preconditions.checkState(lockState.get().initialLockMode != LockMode.READ,
- "A read operation may not be upgraded to a write operation.");
-
- lock.writeLock().lock();
- return lockState.get().lockAcquired(LockMode.WRITE);
- }
-
- /**
- * Releases this thread's write lock.
- */
- public void writeUnlock() {
- lock.writeLock().unlock();
- lockState.get().lockReleased(LockMode.WRITE);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/SchedulerStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/SchedulerStore.java b/src/main/java/com/twitter/aurora/scheduler/storage/SchedulerStore.java
deleted file mode 100644
index 504b90b..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/SchedulerStore.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage;
-
-import javax.annotation.Nullable;
-
-/**
- * Stores data specific to the scheduler itself.
- */
-public interface SchedulerStore {
-
- /**
- * Fetches the last saved framework id. If none is saved, null can be returned.
- *
- * @return the last saved framework id
- */
- @Nullable String fetchFrameworkId();
-
- public interface Mutable extends SchedulerStore {
- /**
- * Stores the given framework id overwriting any previously saved id.
- *
- * @param frameworkId The framework id to store.
- */
- void saveFrameworkId(String frameworkId);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/SnapshotStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/SnapshotStore.java b/src/main/java/com/twitter/aurora/scheduler/storage/SnapshotStore.java
deleted file mode 100644
index c775207..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/SnapshotStore.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage;
-
-/**
- * Storage mechanism that is able to create complete snapshots of the local storage system state
- * and apply these to restore local storage from a snapshotted baseline.
- */
-public interface SnapshotStore<T> {
-
- /**
- * Creates a consistent snapshot of the local storage system.
- *
- * @return A blob that can be used to recover local storage via {@link #applySnapshot(Object)}.
- */
- T createSnapshot();
-
- /**
- * Applies a snapshot blob to the local storage system, wiping out all existing data and
- * resetting with the contents of the snapshot.
- *
- * @param snapshot A snapshot blob created by {@link #createSnapshot()}.
- */
- void applySnapshot(T snapshot);
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/Storage.java b/src/main/java/com/twitter/aurora/scheduler/storage/Storage.java
deleted file mode 100644
index fdc9ae7..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/Storage.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-import com.google.inject.BindingAnnotation;
-
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.SchedulerException;
-import com.twitter.aurora.scheduler.storage.entities.IQuota;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-
-/**
- * Manages scheduler storage operations providing an interface to perform atomic changes.
- */
-public interface Storage {
-
- interface StoreProvider {
- SchedulerStore getSchedulerStore();
- JobStore getJobStore();
- TaskStore getTaskStore();
- LockStore getLockStore();
- QuotaStore getQuotaStore();
- AttributeStore getAttributeStore();
- }
-
- interface MutableStoreProvider extends StoreProvider {
- SchedulerStore.Mutable getSchedulerStore();
- JobStore.Mutable getJobStore();
-
- /**
- * Gets access to the mutable task store.
- * <p>
- * This is labeled as unsafe, since it's rare that a caller should be using this. In most
- * cases, mutations to the task store should be done through
- * {@link com.twitter.aurora.scheduler.state.StateManager}.
- * <p>
- * TODO(William Farner): Come up with a way to restrict access to this interface. As it stands,
- * it's trivial for an unsuspecting caller to modify the task store directly and subvert the
- * state machine and side effect systems.
- *
- * @return The mutable task store.
- */
- TaskStore.Mutable getUnsafeTaskStore();
-
- LockStore.Mutable getLockStore();
- QuotaStore.Mutable getQuotaStore();
- AttributeStore.Mutable getAttributeStore();
- }
-
- /**
- * Encapsulates a read-only storage operation.
- *
- * @param <T> The type of result this unit of work produces.
- * @param <E> The type of exception this unit of work can throw.
- */
- interface Work<T, E extends Exception> {
-
- /**
- * Abstracts a unit of work that has a result, but may also throw a specific exception.
- *
- * @param storeProvider A provider to give access to different available stores.
- * @return the result of the successfully completed unit of work
- * @throws E if the unit of work could not be completed
- */
- T apply(StoreProvider storeProvider) throws E;
-
- /**
- * A convenient typedef for Work that throws no checked exceptions - it runs quietly.
- *
- * @param <T> The type of result this unit of work produces.
- */
- interface Quiet<T> extends Work<T, RuntimeException> {
- // typedef
- }
- }
-
- /**
- * Encapsulates a storage operation, which has mutable storage access.
- *
- * @param <T> The type of result this unit of work produces.
- * @param <E> The type of exception this unit of work can throw.
- */
- interface MutateWork<T, E extends Exception> {
-
- NoResult.Quiet NOOP = new NoResult.Quiet() {
- @Override protected void execute(Storage.MutableStoreProvider storeProvider) {
- // No-op.
- }
- };
-
- /**
- * Abstracts a unit of work that should either commit a set of changes to storage as a side
- * effect of successful completion or else commit no changes at all when an exception is thrown.
- *
- * @param storeProvider A provider to give access to different available stores.
- * @return the result of the successfully completed unit of work
- * @throws E if the unit of work could not be completed
- */
- T apply(MutableStoreProvider storeProvider) throws E;
-
- /**
- * A convenient typedef for Work that throws no checked exceptions - it runs quietly.
- *
- * @param <T> The type of result this unit of work produces.
- */
- interface Quiet<T> extends MutateWork<T, RuntimeException> {
- // typedef
- }
-
- /**
- * Encapsulates work that returns no result.
- *
- * @param <E> The type of exception this unit of work can throw.
- */
- abstract class NoResult<E extends Exception> implements MutateWork<Void, E> {
-
- @Override public final Void apply(MutableStoreProvider storeProvider) throws E {
- execute(storeProvider);
- return null;
- }
-
- /**
- * Similar to {@link #apply(MutableStoreProvider)} except that no result is
- * returned.
- *
- * @param storeProvider A provider to give access to different available stores.
- * @throws E if the unit of work could not be completed
- */
- protected abstract void execute(MutableStoreProvider storeProvider) throws E;
-
- /**
- * A convenient typedef for Work with no result that throws no checked exceptions - it runs
- * quitely.
- */
- public abstract static class Quiet extends NoResult<RuntimeException> {
- // typedef
- }
- }
- }
-
- /**
- * Indicates a problem reading from or writing to stable storage.
- */
- class StorageException extends SchedulerException {
- public StorageException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public StorageException(String message) {
- super(message);
- }
- }
-
- /**
- * Executes the unit of read-only {@code work}. All data in the stores may be expected to be
- * consistent, as the invocation is mutually exclusive of any writes.
- *
- * @param work The unit of work to execute.
- * @param <T> The type of result this unit of work produces.
- * @param <E> The type of exception this unit of work can throw.
- * @return the result when the unit of work completes successfully
- * @throws StorageException if there was a problem reading from stable storage.
- * @throws E bubbled transparently when the unit of work throws
- */
- <T, E extends Exception> T consistentRead(Work<T, E> work) throws StorageException, E;
-
- /**
- * Executes a unit of read-only {@code work}. This is functionally identical to
- * {@link #consistentRead(Work)} with the exception that data in the stores may not be fully
- * consistent.
- *
- * @param work The unit of work to execute.
- * @param <T> The type of result this unit of work produces.
- * @param <E> The type of exception this unit of work can throw.
- * @return the result when the unit of work completes successfully
- * @throws StorageException if there was a problem reading from stable storage.
- * @throws E bubbled transparently when the unit of work throws
- */
- <T, E extends Exception> T weaklyConsistentRead(Work<T, E> work) throws StorageException, E;
-
- /**
- * Executes the unit of mutating {@code work}.
- *
- * @param work The unit of work to execute.
- * @param <T> The type of result this unit of work produces.
- * @param <E> The type of exception this unit of work can throw.
- * @return the result when the unit of work completes successfully
- * @throws StorageException if there was a problem reading from or writing to stable storage.
- * @throws E bubbled transparently when the unit of work throws
- */
- <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E;
-
- /**
- * Clean up the underlying storage by optimizing internal data structures. Does not change
- * externally-visible state but might not run concurrently with write operations.
- */
- void snapshot() throws StorageException;
-
- /**
- * A non-volatile storage that has additional methods to control its lifecycle.
- */
- interface NonVolatileStorage extends Storage {
- /**
- * Requests the underlying storage prepare its data set; ie: initialize schemas, begin syncing
- * out of date data, etc. This method should not block.
- *
- * @throws StorageException if there was a problem preparing storage.
- */
- void prepare() throws StorageException;
-
- /**
- * Prepares the underlying storage for serving traffic.
- *
- * @param initializationLogic work to perform after this storage system is ready but before
- * allowing general use of
- * {@link #consistentRead}.
- * @throws StorageException if there was a starting storage.
- */
- void start(MutateWork.NoResult.Quiet initializationLogic) throws StorageException;
-
- /**
- * Prepares the underlying storage system for clean shutdown.
- */
- void stop();
- }
-
- /**
- * Identifies a storage layer that is in-memory only.
- * This generally should only be used when the storage is first starting up, to perform queries
- * related to initially load the storage.
- */
- @Retention(RetentionPolicy.RUNTIME)
- @Target({ ElementType.PARAMETER, ElementType.METHOD })
- @BindingAnnotation
- public @interface Volatile { }
-
- /**
- * Utility functions for interacting with a Storage instance.
- */
- public final class Util {
-
- private Util() {
- // Utility class.
- }
-
- /**
- * Fetch tasks matching the query returned by {@code query} from {@code storage} in a
- * read operation.
- *
- * @see #consistentFetchTasks
- * @param storage Storage instance to query from.
- * @param query Builder of the query to perform.
- * @return Tasks returned from the query.
- */
- public static ImmutableSet<IScheduledTask> consistentFetchTasks(
- Storage storage,
- final Query.Builder query) {
-
- return storage.consistentRead(new Work.Quiet<ImmutableSet<IScheduledTask>>() {
- @Override public ImmutableSet<IScheduledTask> apply(StoreProvider storeProvider) {
- return storeProvider.getTaskStore().fetchTasks(query);
- }
- });
- }
-
- /**
- * Identical to {@link #consistentFetchTasks(Storage, Query.Builder)}, but fetches tasks using a
- * weakly-consistent read operation.
- *
- * @see #consistentFetchTasks
- * @param storage Storage instance to query from.
- * @param query Builder of the query to perform.
- * @return Tasks returned from the query.
- */
- public static ImmutableSet<IScheduledTask> weaklyConsistentFetchTasks(
- Storage storage,
- final Query.Builder query) {
-
- return storage.weaklyConsistentRead(new Work.Quiet<ImmutableSet<IScheduledTask>>() {
- @Override public ImmutableSet<IScheduledTask> apply(StoreProvider storeProvider) {
- return storeProvider.getTaskStore().fetchTasks(query);
- }
- });
- }
-
- /**
- * Fetch quota for {@code role} from {@code storage} in a consistent read operation.
- *
- * @param storage Storage instance to fetch quota from.
- * @param role Role to fetch quota for.
- * @return Quota returned from the fetch operation.
- * @see QuotaStore#fetchQuota(String)
- */
- public static Optional<IQuota> consistentFetchQuota(Storage storage, final String role) {
- return storage.consistentRead(new Work.Quiet<Optional<IQuota>>() {
- @Override public Optional<IQuota> apply(StoreProvider storeProvider) {
- return storeProvider.getQuotaStore().fetchQuota(role);
- }
- });
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/StorageBackfill.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/StorageBackfill.java b/src/main/java/com/twitter/aurora/scheduler/storage/StorageBackfill.java
deleted file mode 100644
index df5b603..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/StorageBackfill.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage;
-
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-
-import com.twitter.aurora.gen.JobConfiguration;
-import com.twitter.aurora.gen.ScheduleStatus;
-import com.twitter.aurora.gen.ScheduledTask;
-import com.twitter.aurora.gen.TaskConfig;
-import com.twitter.aurora.gen.TaskEvent;
-import com.twitter.aurora.scheduler.base.JobKeys;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.configuration.ConfigurationManager;
-import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import com.twitter.aurora.scheduler.storage.TaskStore.Mutable.TaskMutation;
-import com.twitter.aurora.scheduler.storage.entities.IJobConfiguration;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.Clock;
-
-/**
- * Utility class to contain and perform storage backfill operations.
- */
-public final class StorageBackfill {
-
- private static final Logger LOG = Logger.getLogger(StorageBackfill.class.getName());
-
- private static final AtomicLong SHARD_SANITY_CHECK_FAILS =
- Stats.exportLong("shard_sanity_check_failures");
-
- private StorageBackfill() {
- // Utility class.
- }
-
- private static void backfillJobDefaults(JobStore.Mutable jobStore) {
- for (String id : jobStore.fetchManagerIds()) {
- for (JobConfiguration job : IJobConfiguration.toBuildersList(jobStore.fetchJobs(id))) {
- ConfigurationManager.applyDefaultsIfUnset(job);
- jobStore.saveAcceptedJob(id, IJobConfiguration.build(job));
- }
- }
- }
-
- private static void guaranteeShardUniqueness(
- ScheduledTask task,
- TaskStore.Mutable taskStore,
- Clock clock) {
-
- if (Tasks.isActive(task.getStatus())) {
- // Perform a sanity check on the number of active shards.
- TaskConfig config = task.getAssignedTask().getTask();
- Query.Builder query = Query.instanceScoped(
- JobKeys.from(config.getOwner().getRole(), config.getEnvironment(), config.getJobName()),
- task.getAssignedTask().getInstanceId())
- .active();
- Set<String> activeTasksInShard = FluentIterable.from(taskStore.fetchTasks(query))
- .transform(Tasks.SCHEDULED_TO_ID)
- .toSet();
-
- if (activeTasksInShard.size() > 1) {
- SHARD_SANITY_CHECK_FAILS.incrementAndGet();
- LOG.severe("Active shard sanity check failed when loading " + Tasks.id(task)
- + ", active tasks found: " + activeTasksInShard);
-
- // We want to keep exactly one task from this shard, so sort the IDs and keep the
- // highest (newest) in the hopes that it is legitimately running.
- String newestTask = Iterables.getLast(Sets.newTreeSet(activeTasksInShard));
- if (!Tasks.id(task).equals(newestTask)) {
- task.setStatus(ScheduleStatus.KILLED);
- task.addToTaskEvents(new TaskEvent(clock.nowMillis(), ScheduleStatus.KILLED)
- .setMessage("Killed duplicate shard."));
- // TODO(wfarner); Circle back if this is necessary. Currently there's a race
- // condition between the time the scheduler is actually available without hitting
- // IllegalStateException (see DriverImpl).
- // driver.killTask(Tasks.id(task));
- } else {
- LOG.info("Retaining task " + Tasks.id(task));
- }
- }
- }
- }
-
- private static final AtomicLong BOTH_FIELDS_SET = Stats.exportLong("both_instance_ids_set");
- private static final AtomicLong OLD_FIELD_SET = Stats.exportLong("old_instance_id_set");
- private static final AtomicLong NEW_FIELD_SET = Stats.exportLong("new_instance_id_set");
- private static final AtomicLong FIELDS_INCONSISTENT =
- Stats.exportLong("instance_ids_inconsistent");
-
- /**
- * Ensures backwards-compatibility of the throttled state, which exists in this version but is
- * not handled.
- *
- * @param task Task to possibly rewrite.
- */
- private static void rewriteThrottledState(ScheduledTask task) {
- if (ScheduleStatus.THROTTLED == task.getStatus()) {
- task.setStatus(ScheduleStatus.PENDING);
- }
- }
-
- /**
- * Backfills the storage to make it match any assumptions that may have changed since
- * the structs were first written.
- *
- * @param storeProvider Storage provider.
- * @param clock Clock, used for timestamping backfilled task events.
- */
- public static void backfill(final MutableStoreProvider storeProvider, final Clock clock) {
- backfillJobDefaults(storeProvider.getJobStore());
-
- LOG.info("Performing shard uniqueness sanity check.");
- storeProvider.getUnsafeTaskStore().mutateTasks(Query.unscoped(), new TaskMutation() {
- @Override public IScheduledTask apply(final IScheduledTask task) {
- ScheduledTask builder = task.newBuilder();
- ConfigurationManager.applyDefaultsIfUnset(builder.getAssignedTask().getTask());
- // TODO(ksweeney): Guarantee tasks pass current validation code here and quarantine if they
- // don't.
- guaranteeShardUniqueness(builder, storeProvider.getUnsafeTaskStore(), clock);
- rewriteThrottledState(builder);
- return IScheduledTask.build(builder);
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/TaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/TaskStore.java b/src/main/java/com/twitter/aurora/scheduler/storage/TaskStore.java
deleted file mode 100644
index 02e5096..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/TaskStore.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage;
-
-import java.util.Set;
-
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableSet;
-
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.aurora.scheduler.storage.entities.ITaskConfig;
-
-/**
- * Stores all tasks configured with the scheduler.
- */
-public interface TaskStore {
-
- /**
- * Fetches a read-only view of tasks matching a query and filters. Intended for use with a
- * {@link com.twitter.aurora.scheduler.base.Query.Builder}.
- *
- * @param query Builder of the query to identify tasks with.
- * @return A read-only view of matching tasks.
- */
- ImmutableSet<IScheduledTask> fetchTasks(Query.Builder query);
-
- public interface Mutable extends TaskStore {
-
- /**
- * A convenience interface to allow callers to more concisely implement a task mutation.
- */
- public interface TaskMutation extends Function<IScheduledTask, IScheduledTask> {
- }
-
- /**
- * Saves tasks to the store. Tasks are copied internally, meaning that the tasks are stored in
- * the state they were in when the method is called, and further object modifications will not
- * affect the tasks. If any of the tasks already exist in the store, they will be overwritten
- * by the supplied {@code newTasks}.
- *
- * @param tasks Tasks to add.
- */
- void saveTasks(Set<IScheduledTask> tasks);
-
- /**
- * Removes all tasks from the store.
- */
- void deleteAllTasks();
-
- /**
- * Deletes specific tasks from the store.
- *
- * @param taskIds IDs of tasks to delete.
- */
- void deleteTasks(Set<String> taskIds);
-
- /**
- * Offers temporary mutable access to tasks. If a task ID is not found, it will be silently
- * skipped, and no corresponding task will be returned.
- *
- * @param query Query to match tasks against.
- * @param mutator The mutate operation.
- * @return Immutable copies of only the tasks that were mutated.
- */
- ImmutableSet<IScheduledTask> mutateTasks(
- Query.Builder query,
- Function<IScheduledTask, IScheduledTask> mutator);
-
- /**
- * Rewrites a task's configuration in-place.
- * <p>
- * <b>WARNING</b>: this is a dangerous operation, and should not be used without exercising
- * great care. This feature should be used as a last-ditch effort to rewrite things that
- * the scheduler otherwise can't (e.g. {@link ITaskConfig#executorConfig}) rewrite in a
- * controlled/tested backfill operation.
- *
- * @param taskId ID of the task to alter.
- * @param taskConfiguration Configuration object to swap with the existing task's
- * configuration.
- * @return {@code true} if the modification took effect, or {@code false} if the task does not
- * exist in the store.
- */
- boolean unsafeModifyInPlace(String taskId, ITaskConfig taskConfiguration);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/backup/BackupModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/backup/BackupModule.java b/src/main/java/com/twitter/aurora/scheduler/storage/backup/BackupModule.java
deleted file mode 100644
index b6beba3..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/backup/BackupModule.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.backup;
-
-import java.io.File;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-import javax.inject.Singleton;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.inject.PrivateModule;
-import com.google.inject.Provides;
-import com.google.inject.TypeLiteral;
-
-import com.twitter.aurora.gen.storage.Snapshot;
-import com.twitter.aurora.scheduler.storage.SnapshotStore;
-import com.twitter.aurora.scheduler.storage.backup.Recovery.RecoveryImpl;
-import com.twitter.aurora.scheduler.storage.backup.StorageBackup.StorageBackupImpl;
-import com.twitter.aurora.scheduler.storage.backup.StorageBackup.StorageBackupImpl.BackupConfig;
-import com.twitter.aurora.scheduler.storage.backup.TemporaryStorage.TemporaryStorageFactory;
-import com.twitter.common.application.Lifecycle;
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.args.constraints.NotNull;
-import com.twitter.common.base.Command;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A module that will periodically save full storage backups to local disk and makes those backups
- * available for on-line recovery.
- */
-public class BackupModule extends PrivateModule {
- private static final Logger LOG = Logger.getLogger(BackupModule.class.getName());
-
- @CmdLine(name = "backup_interval", help = "Minimum interval on which to write a storage backup.")
- private static final Arg<Amount<Long, Time>> BACKUP_INTERVAL =
- Arg.create(Amount.of(1L, Time.HOURS));
-
- @CmdLine(name = "max_saved_backups",
- help = "Maximum number of backups to retain before deleting the oldest backups.")
- private static final Arg<Integer> MAX_SAVED_BACKUPS = Arg.create(48);
-
- @NotNull
- @CmdLine(name = "backup_dir",
- help = "Directory to store backups under. Will be created if it does not exist.")
- private static final Arg<File> BACKUP_DIR = Arg.create();
-
- private final Class<? extends SnapshotStore<Snapshot>> snapshotStore;
- private final File unvalidatedBackupDir;
-
- /**
- * Creates a new backup module.
- *
- * @param snapshotStore Snapshot store implementation class.
- */
- public BackupModule(Class<? extends SnapshotStore<Snapshot>> snapshotStore) {
- this(BACKUP_DIR.get(), snapshotStore);
- }
-
- /**
- * Creates a new backup module using a given backupDir instead of a flagged one.
- *
- * @param backupDir Directory to write backups to.
- * @param snapshotStore Snapshot store implementation class.
- */
- @VisibleForTesting
- public BackupModule(File backupDir, Class<? extends SnapshotStore<Snapshot>> snapshotStore) {
- this.unvalidatedBackupDir = checkNotNull(backupDir);
- this.snapshotStore = checkNotNull(snapshotStore);
- }
-
- @Override
- protected void configure() {
- TypeLiteral<SnapshotStore<Snapshot>> type = new TypeLiteral<SnapshotStore<Snapshot>>() { };
- bind(type).annotatedWith(StorageBackupImpl.SnapshotDelegate.class).to(snapshotStore);
-
- bind(type).to(StorageBackupImpl.class);
- bind(StorageBackup.class).to(StorageBackupImpl.class);
- bind(StorageBackupImpl.class).in(Singleton.class);
- expose(type);
- expose(StorageBackup.class);
-
- bind(new TypeLiteral<Function<Snapshot, TemporaryStorage>>() { })
- .to(TemporaryStorageFactory.class);
-
- bind(Command.class).to(LifecycleHook.class);
- bind(Recovery.class).to(RecoveryImpl.class);
- bind(RecoveryImpl.class).in(Singleton.class);
- expose(Recovery.class);
- }
-
- static class LifecycleHook implements Command {
- private final Lifecycle lifecycle;
-
- @Inject LifecycleHook(Lifecycle lifecycle) {
- this.lifecycle = checkNotNull(lifecycle);
- }
-
- @Override public void execute() {
- lifecycle.shutdown();
- }
- }
-
- @Provides
- private File provideBackupDir() {
- if (!unvalidatedBackupDir.exists()) {
- if (!unvalidatedBackupDir.mkdirs()) {
- throw new IllegalArgumentException(
- "Unable to create backup dir " + unvalidatedBackupDir.getPath() + ".");
- } else {
- LOG.info("Created backup dir " + unvalidatedBackupDir.getPath() + ".");
- }
- }
-
- if (!unvalidatedBackupDir.canWrite()) {
- throw new IllegalArgumentException(
- "Backup dir " + unvalidatedBackupDir.getPath() + " is not writable.");
- }
-
- return unvalidatedBackupDir;
- }
-
- @Provides
- private BackupConfig provideBackupConfig(File backupDir) {
- return new BackupConfig(backupDir, MAX_SAVED_BACKUPS.get(), BACKUP_INTERVAL.get());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/backup/Recovery.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/backup/Recovery.java b/src/main/java/com/twitter/aurora/scheduler/storage/backup/Recovery.java
deleted file mode 100644
index 4e91342..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/backup/Recovery.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.backup;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.Nullable;
-import javax.inject.Inject;
-
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.io.Files;
-import com.google.common.util.concurrent.Atomics;
-
-import com.twitter.aurora.codec.ThriftBinaryCodec;
-import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
-import com.twitter.aurora.gen.storage.Snapshot;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.storage.DistributedSnapshotStore;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.common.base.Command;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A recovery mechanism that works with {@link StorageBackup} to provide a two-step storage
- * recovery process.
- */
-public interface Recovery {
-
- /**
- * List backups available for recovery.
- *
- * @return Available backup IDs.
- */
- Set<String> listBackups();
-
- /**
- * Loads a backup in 'staging' so that it may be queried and modified prior to committing.
- *
- * @param backupName Name of the backup to load.
- * @throws RecoveryException If the backup could not be found or loaded.
- */
- void stage(String backupName) throws RecoveryException;
-
- /**
- * Queries a staged backup.
- *
- * @param query Builder of query to perform.
- * @return Tasks matching the query.
- * @throws RecoveryException If a backup is not staged, or could not be queried.
- */
- Set<IScheduledTask> query(Query.Builder query) throws RecoveryException;
-
- /**
- * Deletes tasks from a staged backup.
- *
- * @param query Query builder for tasks to delete.
- * @throws RecoveryException If a backup is not staged, or tasks could not be deleted.
- */
- void deleteTasks(Query.Builder query) throws RecoveryException;
-
- /**
- * Unloads a staged backup.
- */
- void unload();
-
- /**
- * Commits a staged backup the main storage system.
- *
- * @throws RecoveryException If a backup is not staged, or the commit failed.
- */
- void commit() throws RecoveryException;
-
- /**
- * Thrown when a recovery operation could not be completed due to internal errors or improper
- * invocation order.
- */
- public static class RecoveryException extends Exception {
- RecoveryException(String message) {
- super(message);
- }
-
- RecoveryException(String message, Throwable cause) {
- super(message, cause);
- }
- }
-
- class RecoveryImpl implements Recovery {
- private final File backupDir;
- private final Function<Snapshot, TemporaryStorage> tempStorageFactory;
- private final AtomicReference<PendingRecovery> recovery;
- private final Storage primaryStorage;
- private final DistributedSnapshotStore distributedStore;
- private final Command shutDownNow;
-
- @Inject
- RecoveryImpl(
- File backupDir,
- Function<Snapshot, TemporaryStorage> tempStorageFactory,
- Storage primaryStorage,
- DistributedSnapshotStore distributedStore,
- Command shutDownNow) {
-
- this.backupDir = checkNotNull(backupDir);
- this.tempStorageFactory = checkNotNull(tempStorageFactory);
- this.recovery = Atomics.newReference();
- this.primaryStorage = checkNotNull(primaryStorage);
- this.distributedStore = checkNotNull(distributedStore);
- this.shutDownNow = checkNotNull(shutDownNow);
- }
-
- @Override public Set<String> listBackups() {
- return ImmutableSet.<String>builder().add(backupDir.list()).build();
- }
-
- @Override public void stage(String backupName) throws RecoveryException {
- File backupFile = new File(backupDir, backupName);
- if (!backupFile.exists()) {
- throw new RecoveryException("Backup " + backupName + " does not exist.");
- }
-
- Snapshot snapshot;
- try {
- snapshot = ThriftBinaryCodec.decode(Snapshot.class, Files.toByteArray(backupFile));
- } catch (CodingException e) {
- throw new RecoveryException("Failed to decode backup " + e, e);
- } catch (IOException e) {
- throw new RecoveryException("Failed to read backup " + e, e);
- }
- boolean applied =
- recovery.compareAndSet(null, new PendingRecovery(tempStorageFactory.apply(snapshot)));
- if (!applied) {
- throw new RecoveryException("Another backup is already loaded.");
- }
- }
-
- private PendingRecovery getLoadedRecovery() throws RecoveryException {
- @Nullable PendingRecovery loaded = this.recovery.get();
- if (loaded == null) {
- throw new RecoveryException("No backup loaded.");
- }
- return loaded;
- }
-
- @Override public Set<IScheduledTask> query(Query.Builder query) throws RecoveryException {
- return getLoadedRecovery().query(query);
- }
-
- @Override public void deleteTasks(Query.Builder query) throws RecoveryException {
- getLoadedRecovery().delete(query);
- }
-
- @Override public void unload() {
- recovery.set(null);
- }
-
- @Override public void commit() throws RecoveryException {
- getLoadedRecovery().commit();
- }
-
- private class PendingRecovery {
- private final TemporaryStorage tempStorage;
-
- PendingRecovery(TemporaryStorage tempStorage) {
- this.tempStorage = tempStorage;
- }
-
- void commit() {
- primaryStorage.write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider storeProvider) {
- try {
- distributedStore.persist(tempStorage.toSnapshot());
- shutDownNow.execute();
- } catch (CodingException e) {
- throw new IllegalStateException("Failed to encode snapshot.", e);
- }
- }
- });
- }
-
- Set<IScheduledTask> query(final Query.Builder query) {
- return tempStorage.fetchTasks(query);
- }
-
- void delete(final Query.Builder query) {
- tempStorage.deleteTasks(query);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/backup/StorageBackup.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/backup/StorageBackup.java b/src/main/java/com/twitter/aurora/scheduler/storage/backup/StorageBackup.java
deleted file mode 100644
index 3faeb1f..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/backup/StorageBackup.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.backup;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Ordering;
-import com.google.common.io.Files;
-import com.google.inject.BindingAnnotation;
-
-import com.twitter.aurora.codec.ThriftBinaryCodec;
-import com.twitter.aurora.codec.ThriftBinaryCodec.CodingException;
-import com.twitter.aurora.gen.storage.Snapshot;
-import com.twitter.aurora.scheduler.storage.SnapshotStore;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.stats.Stats;
-import com.twitter.common.util.Clock;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A backup routine that layers over a snapshot store and periodically writes snapshots to
- * local disk.
- *
- * TODO(William Farner): Perform backups asynchronously. As written, they are performed in a
- * blocking write operation, which is asking for trouble.
- */
-public interface StorageBackup {
-
- /**
- * Perform a storage backup immediately, blocking until it is complete.
- */
- void backupNow();
-
- class StorageBackupImpl implements StorageBackup, SnapshotStore<Snapshot> {
- private static final Logger LOG = Logger.getLogger(StorageBackup.class.getName());
-
- private static final String FILE_PREFIX = "scheduler-backup-";
- private final BackupConfig config;
-
- static class BackupConfig {
- private final File dir;
- private final int maxBackups;
- private final Amount<Long, Time> interval;
-
- BackupConfig(File dir, int maxBackups, Amount<Long, Time> interval) {
- this.dir = checkNotNull(dir);
- this.maxBackups = maxBackups;
- this.interval = checkNotNull(interval);
- }
-
- @VisibleForTesting
- File getDir() {
- return dir;
- }
- }
-
- /**
- * Binding annotation that the underlying {@link SnapshotStore} must be bound with.
- */
- @BindingAnnotation
- @Target({FIELD, PARAMETER, METHOD}) @Retention(RUNTIME)
- @interface SnapshotDelegate { }
-
- private final SnapshotStore<Snapshot> delegate;
- private final Clock clock;
- private final long backupIntervalMs;
- private volatile long lastBackupMs;
- private final DateFormat backupDateFormat;
-
- private final AtomicLong successes = Stats.exportLong("scheduler_backup_success");
- @VisibleForTesting
- AtomicLong getSuccesses() {
- return successes;
- }
-
- private final AtomicLong failures = Stats.exportLong("scheduler_backup_failed");
- @VisibleForTesting
- AtomicLong getFailures() {
- return failures;
- }
-
- @Inject
- StorageBackupImpl(
- @SnapshotDelegate SnapshotStore<Snapshot> delegate,
- Clock clock,
- BackupConfig config) {
-
- this.delegate = checkNotNull(delegate);
- this.clock = checkNotNull(clock);
- this.config = checkNotNull(config);
- backupDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm");
- backupIntervalMs = config.interval.as(Time.MILLISECONDS);
- lastBackupMs = clock.nowMillis();
- }
-
- @Override public Snapshot createSnapshot() {
- Snapshot snapshot = delegate.createSnapshot();
- if (clock.nowMillis() >= (lastBackupMs + backupIntervalMs)) {
- save(snapshot);
- }
- return snapshot;
- }
-
- @Override public void backupNow() {
- save(delegate.createSnapshot());
- }
-
- @VisibleForTesting
- String createBackupName() {
- return FILE_PREFIX + backupDateFormat.format(new Date(clock.nowMillis()));
- }
-
- private void save(Snapshot snapshot) {
- lastBackupMs = clock.nowMillis();
-
- String backupName = createBackupName();
- String tempBackupName = "temp_" + backupName;
- File tempFile = new File(config.dir, tempBackupName);
- LOG.info("Saving backup to " + tempFile);
- try {
- byte[] backup = ThriftBinaryCodec.encodeNonNull(snapshot);
- Files.write(backup, tempFile);
- Files.move(tempFile, new File(config.dir, backupName));
- successes.incrementAndGet();
- } catch (IOException e) {
- failures.incrementAndGet();
- LOG.log(Level.SEVERE, "Failed to prepare backup " + backupName + ": " + e, e);
- } catch (CodingException e) {
- LOG.log(Level.SEVERE, "Failed to encode backup " + backupName + ": " + e, e);
- failures.incrementAndGet();
- } finally {
- if (tempFile.exists()) {
- LOG.info("Deleting incomplete backup file " + tempFile);
- tempFile.delete();
- }
- }
-
- File[] backups = config.dir.listFiles(BACKUP_FILTER);
- if (backups == null) {
- LOG.severe("Failed to list backup dir " + config.dir);
- } else {
- int backupsToDelete = backups.length - config.maxBackups;
- if (backupsToDelete > 0) {
- List<File> toDelete = Ordering.natural()
- .onResultOf(FILE_NAME)
- .sortedCopy(ImmutableList.copyOf(backups)).subList(0, backupsToDelete);
- LOG.info("Deleting " + backupsToDelete + " outdated backups: " + toDelete);
- for (File outdated : toDelete) {
- outdated.delete();
- }
- }
- }
- }
-
- private static final FilenameFilter BACKUP_FILTER = new FilenameFilter() {
- @Override public boolean accept(File file, String s) {
- return s.startsWith(FILE_PREFIX);
- }
- };
-
- @VisibleForTesting
- static final Function<File, String> FILE_NAME = new Function<File, String>() {
- @Override public String apply(File file) {
- return file.getName();
- }
- };
-
- @Override
- public void applySnapshot(Snapshot snapshot) {
- delegate.applySnapshot(snapshot);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bc1635df/src/main/java/com/twitter/aurora/scheduler/storage/backup/TemporaryStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/twitter/aurora/scheduler/storage/backup/TemporaryStorage.java b/src/main/java/com/twitter/aurora/scheduler/storage/backup/TemporaryStorage.java
deleted file mode 100644
index e0906fe..0000000
--- a/src/main/java/com/twitter/aurora/scheduler/storage/backup/TemporaryStorage.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Copyright 2013 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.aurora.scheduler.storage.backup;
-
-import java.util.Set;
-
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-
-import com.twitter.aurora.gen.storage.Snapshot;
-import com.twitter.aurora.scheduler.base.Query;
-import com.twitter.aurora.scheduler.base.Tasks;
-import com.twitter.aurora.scheduler.storage.SnapshotStore;
-import com.twitter.aurora.scheduler.storage.Storage;
-import com.twitter.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.MutateWork;
-import com.twitter.aurora.scheduler.storage.Storage.StoreProvider;
-import com.twitter.aurora.scheduler.storage.Storage.Work;
-import com.twitter.aurora.scheduler.storage.entities.IScheduledTask;
-import com.twitter.aurora.scheduler.storage.log.SnapshotStoreImpl;
-import com.twitter.aurora.scheduler.storage.mem.MemStorage;
-import com.twitter.common.util.testing.FakeClock;
-
-/**
- * A short-lived in-memory storage system that can be converted to a {@link Snapshot}.
- */
-interface TemporaryStorage {
-
- /**
- * Deletes all tasks matching a query. Deleted tasks will not be reflected in the snapshot when
- * {@link #toSnapshot()} is executed.
- *
- * @param query Query builder for tasks to delete.
- */
- void deleteTasks(Query.Builder query);
-
- /**
- * Fetches tasks matching a query.
- *
- * @param query Query builder for tasks to fetch.
- * @return Matching tasks.
- */
- Set<IScheduledTask> fetchTasks(Query.Builder query);
-
- /**
- * Creates a snapshot of the contents of the temporary storage.
- *
- * @return Temporary storage snapshot.
- */
- Snapshot toSnapshot();
-
- /**
- * A factory that creates temporary storage instances, detached from the rest of the system.
- */
- class TemporaryStorageFactory implements Function<Snapshot, TemporaryStorage> {
- @Override public TemporaryStorage apply(Snapshot snapshot) {
- final Storage storage = MemStorage.newEmptyStorage();
- FakeClock clock = new FakeClock();
- clock.setNowMillis(snapshot.getTimestamp());
- final SnapshotStore<Snapshot> snapshotStore = new SnapshotStoreImpl(clock, storage);
- snapshotStore.applySnapshot(snapshot);
-
- return new TemporaryStorage() {
- @Override public void deleteTasks(final Query.Builder query) {
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override protected void execute(MutableStoreProvider storeProvider) {
- Set<String> ids = FluentIterable.from(storeProvider.getTaskStore().fetchTasks(query))
- .transform(Tasks.SCHEDULED_TO_ID)
- .toSet();
- storeProvider.getUnsafeTaskStore().deleteTasks(ids);
- }
- });
- }
-
- @Override public Set<IScheduledTask> fetchTasks(final Query.Builder query) {
- return storage.consistentRead(new Work.Quiet<Set<IScheduledTask>>() {
- @Override public Set<IScheduledTask> apply(StoreProvider storeProvider) {
- return storeProvider.getTaskStore().fetchTasks(query);
- }
- });
- }
-
- @Override public Snapshot toSnapshot() {
- return snapshotStore.createSnapshot();
- }
- };
- }
- }
-}