You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/10/25 06:34:18 UTC
[4/5] aurora git commit: Exclusively use Map-based in-memory stores
for primary storage
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
new file mode 100644
index 0000000..d190add
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemJobUpdateStore.java
@@ -0,0 +1,396 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.aurora.scheduler.storage.mem;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+import com.google.common.primitives.Longs;
+
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.gen.JobInstanceUpdateEvent;
+import org.apache.aurora.gen.JobUpdateAction;
+import org.apache.aurora.gen.JobUpdateDetails;
+import org.apache.aurora.gen.JobUpdateEvent;
+import org.apache.aurora.gen.JobUpdateState;
+import org.apache.aurora.gen.JobUpdateStatus;
+import org.apache.aurora.gen.LockKey;
+import org.apache.aurora.gen.storage.StoredJobUpdateDetails;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.LockStore;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery;
+import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary;
+import org.apache.aurora.scheduler.storage.entities.ILock;
+import org.apache.aurora.scheduler.storage.entities.ILockKey;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.scheduler.storage.Util.jobUpdateActionStatName;
+import static org.apache.aurora.scheduler.storage.Util.jobUpdateStatusStatName;
+
+public class MemJobUpdateStore implements JobUpdateStore.Mutable {
+
+ private static final Ordering<IJobUpdateDetails> REVERSE_LAST_MODIFIED_ORDER = Ordering.natural()
+ .reverse()
+ .onResultOf(u -> u.getUpdate().getSummary().getState().getLastModifiedTimestampMs());
+
+ private final Map<IJobUpdateKey, UpdateAndLock> updates = Maps.newConcurrentMap();
+ private final LockStore lockStore;
+ private final LoadingCache<JobUpdateStatus, AtomicLong> jobUpdateEventStats;
+ private final LoadingCache<JobUpdateAction, AtomicLong> jobUpdateActionStats;
+
+ @Inject
+ public MemJobUpdateStore(LockStore.Mutable lockStore, StatsProvider statsProvider) {
+ this.lockStore = lockStore;
+ this.jobUpdateEventStats = CacheBuilder.newBuilder()
+ .build(new CacheLoader<JobUpdateStatus, AtomicLong>() {
+ @Override
+ public AtomicLong load(JobUpdateStatus status) {
+ return statsProvider.makeCounter(jobUpdateStatusStatName(status));
+ }
+ });
+ for (JobUpdateStatus status : JobUpdateStatus.values()) {
+ jobUpdateEventStats.getUnchecked(status).get();
+ }
+ this.jobUpdateActionStats = CacheBuilder.newBuilder()
+ .build(new CacheLoader<JobUpdateAction, AtomicLong>() {
+ @Override
+ public AtomicLong load(JobUpdateAction action) {
+ return statsProvider.makeCounter(jobUpdateActionStatName(action));
+ }
+ });
+ for (JobUpdateAction action : JobUpdateAction.values()) {
+ jobUpdateActionStats.getUnchecked(action).get();
+ }
+ }
+
+ @Timed("job_update_store_fetch_summaries")
+ @Override
+ public synchronized List<IJobUpdateSummary> fetchJobUpdateSummaries(IJobUpdateQuery query) {
+ return performQuery(query)
+ .map(u -> u.getUpdate().getSummary())
+ .collect(Collectors.toList());
+ }
+
+ @Timed("job_update_store_fetch_details_list")
+ @Override
+ public synchronized List<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateQuery query) {
+ return performQuery(query).collect(Collectors.toList());
+ }
+
+ @Timed("job_update_store_fetch_details")
+ @Override
+ public synchronized Optional<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateKey key) {
+ return Optional.fromNullable(updates.get(key)).transform(u -> u.details);
+ }
+
+ @Timed("job_update_store_fetch_update")
+ @Override
+ public synchronized Optional<IJobUpdate> fetchJobUpdate(IJobUpdateKey key) {
+ return Optional.fromNullable(updates.get(key)).transform(u -> u.details.getUpdate());
+ }
+
+ @Timed("job_update_store_fetch_instructions")
+ @Override
+ public synchronized Optional<IJobUpdateInstructions> fetchJobUpdateInstructions(
+ IJobUpdateKey key) {
+
+ return Optional.fromNullable(updates.get(key))
+ .transform(u -> u.details.getUpdate().getInstructions());
+ }
+
+ private void refreshLocks() {
+ // Simulate database behavior of join performed against locks, used to populate lockToken field.
+
+ ImmutableMap.Builder<IJobUpdateKey, UpdateAndLock> refreshed = ImmutableMap.builder();
+ for (Map.Entry<IJobUpdateKey, UpdateAndLock> entry : updates.entrySet()) {
+ IJobUpdateDetails update = entry.getValue().details;
+ Optional<String> updateLock = entry.getValue().lockToken;
+ if (updateLock.isPresent()) {
+ // Determine if token needs to be cleared to reflect lock store state. The token may only
+ // remain if the lock store token exists and matches.
+ Optional<String> storedLock = Optional.fromNullable(lockStore.fetchLock(ILockKey.build(
+ LockKey.job(entry.getKey().getJob().newBuilder()))).map(ILock::getToken).orElse(null));
+ if (!storedLock.isPresent() || !updateLock.equals(storedLock)) {
+ refreshed.put(entry.getKey(), new UpdateAndLock(update, Optional.absent()));
+ }
+ }
+ }
+
+ updates.putAll(refreshed.build());
+ }
+
+ @Timed("job_update_store_fetch_all_details")
+ @Override
+ public synchronized Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails() {
+ refreshLocks();
+ return updates.values().stream()
+ .map(u -> new StoredJobUpdateDetails(u.details.newBuilder(), u.lockToken.orNull()))
+ .collect(Collectors.toSet());
+ }
+
+ @Timed("job_update_store_fetch_instance_events")
+ @Override
+ public synchronized List<IJobInstanceUpdateEvent> fetchInstanceEvents(
+ IJobUpdateKey key,
+ int instanceId) {
+
+ return java.util.Optional.ofNullable(updates.get(key))
+ .map(u -> u.details.getInstanceEvents())
+ .orElse(ImmutableList.of())
+ .stream()
+ .filter(e -> e.getInstanceId() == instanceId)
+ .collect(Collectors.toList());
+ }
+
+ private static void validateInstructions(IJobUpdateInstructions instructions) {
+ if (!instructions.isSetDesiredState() && instructions.getInitialState().isEmpty()) {
+ throw new IllegalArgumentException(
+ "Missing both initial and desired states. At least one is required.");
+ }
+
+ if (!instructions.getInitialState().isEmpty()) {
+ if (instructions.getInitialState().stream().anyMatch(t -> t.getTask() == null)) {
+ throw new NullPointerException("Invalid initial instance state.");
+ }
+ Preconditions.checkArgument(
+ instructions.getInitialState().stream().noneMatch(t -> t.getInstances().isEmpty()),
+ "Invalid intial instance state ranges.");
+ }
+
+ if (instructions.getDesiredState() != null) {
+ MorePreconditions.checkNotBlank(instructions.getDesiredState().getInstances());
+ Preconditions.checkNotNull(instructions.getDesiredState().getTask());
+ }
+ }
+
+ @Timed("job_update_store_save_update")
+ @Override
+ public synchronized void saveJobUpdate(IJobUpdate update, Optional<String> lockToken) {
+ requireNonNull(update);
+ validateInstructions(update.getInstructions());
+
+ if (updates.containsKey(update.getSummary().getKey())) {
+ throw new StorageException("Update already exists: " + update.getSummary().getKey());
+ }
+
+ JobUpdateDetails mutable = new JobUpdateDetails()
+ .setUpdate(update.newBuilder())
+ .setUpdateEvents(ImmutableList.of())
+ .setInstanceEvents(ImmutableList.of());
+ mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable));
+
+ updates.put(
+ update.getSummary().getKey(),
+ new UpdateAndLock(IJobUpdateDetails.build(mutable), lockToken));
+ }
+
+ private static final Ordering<JobUpdateEvent> EVENT_ORDERING = Ordering.natural()
+ .onResultOf(JobUpdateEvent::getTimestampMs);
+
+ @Timed("job_update_store_save_event")
+ @Override
+ public synchronized void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) {
+ UpdateAndLock update = updates.get(key);
+ if (update == null) {
+ throw new StorageException("Update not found: " + key);
+ }
+
+ JobUpdateDetails mutable = update.details.newBuilder();
+ mutable.addToUpdateEvents(event.newBuilder());
+ mutable.setUpdateEvents(EVENT_ORDERING.sortedCopy(mutable.getUpdateEvents()));
+ mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable));
+ updates.put(key, new UpdateAndLock(IJobUpdateDetails.build(mutable), update.lockToken));
+ jobUpdateEventStats.getUnchecked(event.getStatus()).incrementAndGet();
+ }
+
+ private static final Ordering<JobInstanceUpdateEvent> INSTANCE_EVENT_ORDERING = Ordering.natural()
+ .onResultOf(JobInstanceUpdateEvent::getTimestampMs);
+
+ @Timed("job_update_store_save_instance_event")
+ @Override
+ public synchronized void saveJobInstanceUpdateEvent(
+ IJobUpdateKey key,
+ IJobInstanceUpdateEvent event) {
+
+ UpdateAndLock update = updates.get(key);
+ if (update == null) {
+ throw new StorageException("Update not found: " + key);
+ }
+
+ JobUpdateDetails mutable = update.details.newBuilder();
+ mutable.addToInstanceEvents(event.newBuilder());
+ mutable.setInstanceEvents(INSTANCE_EVENT_ORDERING.sortedCopy(mutable.getInstanceEvents()));
+ mutable.getUpdate().getSummary().setState(synthesizeUpdateState(mutable));
+ updates.put(key, new UpdateAndLock(IJobUpdateDetails.build(mutable), update.lockToken));
+ jobUpdateActionStats.getUnchecked(event.getAction()).incrementAndGet();
+ }
+
+ @Timed("job_update_store_delete_all")
+ @Override
+ public synchronized void deleteAllUpdatesAndEvents() {
+ updates.clear();
+ }
+
+ @Timed("job_update_store_prune_history")
+ @Override
+ public synchronized Set<IJobUpdateKey> pruneHistory(
+ int perJobRetainCount,
+ long historyPruneThresholdMs) {
+
+ Supplier<Stream<IJobUpdateSummary>> completedUpdates = () -> updates.values().stream()
+ .map(u -> u.details.getUpdate().getSummary())
+ .filter(s -> TERMINAL_STATES.contains(s.getState().getStatus()));
+
+ Predicate<IJobUpdateSummary> expiredFilter =
+ s -> s.getState().getCreatedTimestampMs() < historyPruneThresholdMs;
+
+ ImmutableSet.Builder<IJobUpdateKey> pruneBuilder = ImmutableSet.builder();
+
+ // Gather updates based on time threshold.
+ pruneBuilder.addAll(completedUpdates.get()
+ .filter(expiredFilter)
+ .map(IJobUpdateSummary::getKey)
+ .collect(Collectors.toList()));
+
+ Multimap<IJobKey, IJobUpdateSummary> updatesByJob = Multimaps.index(
+ // Avoid counting to-be-removed expired updates.
+ completedUpdates.get().filter(expiredFilter.negate()).iterator(),
+ s -> s.getKey().getJob());
+
+ for (Map.Entry<IJobKey, Collection<IJobUpdateSummary>> entry
+ : updatesByJob.asMap().entrySet()) {
+
+ if (entry.getValue().size() > perJobRetainCount) {
+ Ordering<IJobUpdateSummary> creationOrder = Ordering.natural()
+ .onResultOf(s -> s.getState().getCreatedTimestampMs());
+ pruneBuilder.addAll(creationOrder
+ .leastOf(entry.getValue(), entry.getValue().size() - perJobRetainCount)
+ .stream()
+ .map(s -> s.getKey())
+ .iterator());
+ }
+ }
+
+ Set<IJobUpdateKey> pruned = pruneBuilder.build();
+ updates.keySet().removeAll(pruned);
+
+ return pruned;
+ }
+
+ private static JobUpdateState synthesizeUpdateState(JobUpdateDetails update) {
+ JobUpdateState state = update.getUpdate().getSummary().getState();
+ if (state == null) {
+ state = new JobUpdateState();
+ }
+
+ JobUpdateEvent firstEvent = Iterables.getFirst(update.getUpdateEvents(), null);
+ if (firstEvent != null) {
+ state.setCreatedTimestampMs(firstEvent.getTimestampMs());
+ }
+
+ JobUpdateEvent lastEvent = Iterables.getLast(update.getUpdateEvents(), null);
+ if (lastEvent != null) {
+ state.setStatus(lastEvent.getStatus());
+ state.setLastModifiedTimestampMs(lastEvent.getTimestampMs());
+ }
+
+ JobInstanceUpdateEvent lastInstanceEvent = Iterables.getLast(update.getInstanceEvents(), null);
+ if (lastInstanceEvent != null) {
+ state.setLastModifiedTimestampMs(
+ Longs.max(state.getLastModifiedTimestampMs(), lastInstanceEvent.getTimestampMs()));
+ }
+
+ return state;
+ }
+
+ private Stream<IJobUpdateDetails> performQuery(IJobUpdateQuery query) {
+ Predicate<IJobUpdateDetails> filter = u -> true;
+ if (query.getRole() != null) {
+ filter = filter.and(
+ u -> u.getUpdate().getSummary().getKey().getJob().getRole().equals(query.getRole()));
+ }
+ if (query.getKey() != null) {
+ filter = filter.and(u -> u.getUpdate().getSummary().getKey().equals(query.getKey()));
+ }
+ if (query.getJobKey() != null) {
+ filter = filter.and(
+ u -> u.getUpdate().getSummary().getKey().getJob().equals(query.getJobKey()));
+ }
+ if (query.getUser() != null) {
+ filter = filter.and(u -> u.getUpdate().getSummary().getUser().equals(query.getUser()));
+ }
+ if (query.getUpdateStatuses() != null && !query.getUpdateStatuses().isEmpty()) {
+ filter = filter.and(u -> query.getUpdateStatuses()
+ .contains(u.getUpdate().getSummary().getState().getStatus()));
+ }
+
+ // TODO(wfarner): Modification time is not a stable ordering for pagination, but we use it as
+ // such here. The behavior is carried over from DbJobupdateStore; determine if it is desired.
+ Stream<IJobUpdateDetails> matches = updates.values().stream()
+ .map(u -> u.details)
+ .filter(filter)
+ .sorted(REVERSE_LAST_MODIFIED_ORDER)
+ .skip(query.getOffset());
+
+ if (query.getLimit() > 0) {
+ matches = matches.limit(query.getLimit());
+ }
+
+ return matches;
+ }
+
+ private static final class UpdateAndLock {
+ private final IJobUpdateDetails details;
+ private final Optional<String> lockToken;
+
+ UpdateAndLock(IJobUpdateDetails details, Optional<String> lockToken) {
+ this.details = details;
+ this.lockToken = lockToken;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java
new file mode 100644
index 0000000..4c7bda8
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemLockStore.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.mem;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+import org.apache.aurora.scheduler.storage.LockStore;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.apache.aurora.scheduler.storage.entities.ILock;
+import org.apache.aurora.scheduler.storage.entities.ILockKey;
+
+/**
+ * An in-memory lock store.
+ */
+class MemLockStore implements LockStore.Mutable {
+
+ private final Map<ILockKey, ILock> locks = Maps.newConcurrentMap();
+
+ @Override
+ public void saveLock(ILock lock) {
+ // TODO(wfarner): Re-evaluate, this is not idempotent.
+ if (locks.containsKey(lock.getKey())) {
+ throw new StorageException("Duplicate lock key");
+ }
+ if (FluentIterable.from(locks.values())
+ .transform(ILock::getToken)
+ .anyMatch(Predicates.equalTo(lock.getToken()))) {
+
+ throw new StorageException("Duplicate token");
+ }
+
+ locks.put(lock.getKey(), lock);
+ }
+
+ @Override
+ public void removeLock(ILockKey lockKey) {
+ locks.remove(lockKey);
+ }
+
+ @Override
+ public void deleteLocks() {
+ locks.clear();
+ }
+
+ @Override
+ public Set<ILock> fetchLocks() {
+ return ImmutableSet.copyOf(locks.values());
+ }
+
+ @Override
+ public Optional<ILock> fetchLock(ILockKey lockKey) {
+ return Optional.ofNullable(locks.get(lockKey));
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStore.java
new file mode 100644
index 0000000..61b9eee
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemQuotaStore.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.mem;
+
+import java.util.Map;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+
+/**
+ * An in-memory quota store.
+ */
+class MemQuotaStore implements QuotaStore.Mutable {
+
+ private final Map<String, IResourceAggregate> quotas = Maps.newConcurrentMap();
+
+ @Override
+ public void deleteQuotas() {
+ quotas.clear();
+ }
+
+ @Override
+ public void removeQuota(String role) {
+ quotas.remove(role);
+ }
+
+ @Override
+ public void saveQuota(String role, IResourceAggregate quota) {
+ quotas.put(role, quota);
+ }
+
+ @Override
+ public Optional<IResourceAggregate> fetchQuota(String role) {
+ return Optional.fromNullable(quotas.get(role));
+ }
+
+ @Override
+ public Map<String, IResourceAggregate> fetchQuotas() {
+ return ImmutableMap.copyOf(quotas);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/storage/mem/MemSchedulerStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemSchedulerStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemSchedulerStore.java
new file mode 100644
index 0000000..8a7ddd7
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemSchedulerStore.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.mem;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Atomics;
+
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+
+/**
+ * An in-memory scheduler store.
+ */
+class MemSchedulerStore implements SchedulerStore.Mutable {
+ private final AtomicReference<String> frameworkId = Atomics.newReference();
+
+ @Override
+ public void saveFrameworkId(String newFrameworkId) {
+ frameworkId.set(newFrameworkId);
+ }
+
+ @Override
+ public Optional<String> fetchFrameworkId() {
+ return Optional.fromNullable(frameworkId.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
new file mode 100644
index 0000000..7ace104
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.mem;
+
+import javax.inject.Inject;
+
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.LockStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.TaskStore;
+
+/**
+ * A storage implementation comprised of individual in-memory store implementations.
+ */
+public class MemStorage implements Storage {
+ private final MutableStoreProvider storeProvider;
+
+ @Inject
+ MemStorage(
+ @Volatile final SchedulerStore.Mutable schedulerStore,
+ @Volatile final CronJobStore.Mutable jobStore,
+ @Volatile final TaskStore.Mutable taskStore,
+ @Volatile final LockStore.Mutable lockStore,
+ @Volatile final QuotaStore.Mutable quotaStore,
+ @Volatile final AttributeStore.Mutable attributeStore,
+ @Volatile final JobUpdateStore.Mutable updateStore) {
+
+ storeProvider = new MutableStoreProvider() {
+ @Override
+ public SchedulerStore.Mutable getSchedulerStore() {
+ return schedulerStore;
+ }
+
+ @Override
+ public CronJobStore.Mutable getCronJobStore() {
+ return jobStore;
+ }
+
+ @Override
+ public TaskStore getTaskStore() {
+ return taskStore;
+ }
+
+ @Override
+ public TaskStore.Mutable getUnsafeTaskStore() {
+ return taskStore;
+ }
+
+ @Override
+ public LockStore.Mutable getLockStore() {
+ return lockStore;
+ }
+
+ @Override
+ public QuotaStore.Mutable getQuotaStore() {
+ return quotaStore;
+ }
+
+ @Override
+ public AttributeStore.Mutable getAttributeStore() {
+ return attributeStore;
+ }
+
+ @Override
+ public JobUpdateStore.Mutable getJobUpdateStore() {
+ return updateStore;
+ }
+ };
+ }
+
+ @Timed("mem_storage_read_operation")
+ @Override
+ public <T, E extends Exception> T read(final Work<T, E> work) throws StorageException, E {
+ return work.apply(storeProvider);
+ }
+
+ @Timed("mem_storage_write_operation")
+ @Override
+ public <T, E extends Exception> T write(final MutateWork<T, E> work) throws StorageException, E {
+ return work.apply(storeProvider);
+ }
+
+ @Override
+ public void prepare() {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java
new file mode 100644
index 0000000..2ad84eb
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorageModule.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.mem;
+
+import javax.inject.Singleton;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.PrivateModule;
+import com.google.inject.TypeLiteral;
+
+import org.apache.aurora.common.inject.Bindings.KeyFactory;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.CronJobStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.LockStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.Volatile;
+import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.mem.MemTaskStore.SlowQueryThreshold;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Binding module for in-memory stores.
+ * <p>
+ * NOTE: These stores are being phased out in favor of database-backed stores.
+ */
+public final class MemStorageModule extends PrivateModule {
+
+ private final KeyFactory keyFactory;
+
+ public MemStorageModule() {
+ this(KeyFactory.PLAIN);
+ }
+
+ public MemStorageModule(KeyFactory keyFactory) {
+ this.keyFactory = requireNonNull(keyFactory);
+ }
+
+ private <T> void bindStore(Class<T> binding, Class<? extends T> impl) {
+ bind(binding).to(impl);
+ bind(impl).in(Singleton.class);
+ Key<T> key = Key.get(binding, Volatile.class);
+ bind(key).to(impl);
+ expose(key);
+ expose(binding);
+ }
+
+ @Override
+ protected void configure() {
+ bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(SlowQueryThreshold.class)
+ .toInstance(Amount.of(25L, Time.MILLISECONDS));
+ bindStore(TaskStore.Mutable.class, MemTaskStore.class);
+ bindStore(CronJobStore.Mutable.class, MemCronJobStore.class);
+ bindStore(AttributeStore.Mutable.class, MemAttributeStore.class);
+ bindStore(LockStore.Mutable.class, MemLockStore.class);
+ bindStore(QuotaStore.Mutable.class, MemQuotaStore.class);
+ bindStore(SchedulerStore.Mutable.class, MemSchedulerStore.class);
+ bindStore(JobUpdateStore.Mutable.class, MemJobUpdateStore.class);
+
+ Key<Storage> storageKey = keyFactory.create(Storage.class);
+ bind(storageKey).to(MemStorage.class);
+ bind(MemStorage.class).in(Singleton.class);
+ expose(storageKey);
+ }
+
+ /**
+ * Creates a new empty in-memory storage for use in testing.
+ */
+ @VisibleForTesting
+ public static Storage newEmptyStorage() {
+ Injector injector = Guice.createInjector(
+ new MemStorageModule(),
+ new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(StatsProvider.class).to(FakeStatsProvider.class);
+ bind(FakeStatsProvider.class).in(Singleton.class);
+ }
+ });
+
+ Storage storage = injector.getInstance(Storage.class);
+ storage.prepare();
+ return storage;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java
deleted file mode 100644
index c28fb65..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/Util.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.mem;
-
-import com.google.common.base.Function;
-
-import org.apache.thrift.TBase;
-
-/**
- * Utility class for common operations amongst in-memory store implementations.
- */
-final class Util {
-
- private Util() {
- // Utility class.
- }
-
- /**
- * Creates a function that performs a 'deep copy' on a thrift struct of a specific type. The
- * resulting copied objects will be exactly identical to the original. Mutations to the original
- * object will not be reflected in the copy, and vice versa.
- *
- * @return A copier for the provided type of thrift structs.
- */
- static <T extends TBase<T, ?>> Function<T, T> deepCopier() {
- return input -> {
- if (input == null) {
- return null;
- }
-
- @SuppressWarnings("unchecked")
- T t = (T) input.deepCopy();
- return t;
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index 3d35e97..27c0b43 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -77,8 +77,6 @@ import static com.google.common.base.Preconditions.checkState;
import static org.apache.aurora.gen.JobUpdateStatus.ABORTED;
import static org.apache.aurora.gen.JobUpdateStatus.ERROR;
-import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_BACK;
-import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_FORWARD;
import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE;
@@ -470,14 +468,6 @@ class JobUpdateControllerImpl implements JobUpdateController {
changeJobUpdateStatus(storeProvider, key, event, true);
}
- private static final Set<JobUpdateStatus> TERMINAL_STATES = ImmutableSet.of(
- ROLLED_FORWARD,
- ROLLED_BACK,
- ABORTED,
- JobUpdateStatus.FAILED,
- ERROR
- );
-
private void changeJobUpdateStatus(
MutableStoreProvider storeProvider,
IJobUpdateKey key,
@@ -494,7 +484,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
IJobUpdateEvent.build(proposedEvent.setTimestampMs(clock.nowMillis()).setStatus(status)));
}
- if (TERMINAL_STATES.contains(status)) {
+ if (JobUpdateStore.TERMINAL_STATES.contains(status)) {
lockManager.releaseLock(key.getJob());
pulseHandler.remove(key);
} else {
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index efbc42c..67a0d5a 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -218,7 +218,7 @@ public class SchedulerIT extends BaseZooKeeperTest {
ImmutableList.<Module>builder()
.add(SchedulerMain.getUniversalModule(new CliOptions()))
.add(new TierModule(TaskTestUtil.TIER_CONFIG))
- .add(new LogStorageModule(new LogStorageModule.Options(), false))
+ .add(new LogStorageModule(new LogStorageModule.Options()))
.add(new ServiceDiscoveryModule(zkClientConfig, SERVERSET_PATH))
.add(testModule)
.build()
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
index 3dee55a..c639ab6 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java
@@ -15,18 +15,15 @@ package org.apache.aurora.scheduler.app.local;
import java.io.File;
import java.util.List;
-import java.util.Set;
import javax.inject.Singleton;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
import com.google.common.io.Files;
import com.google.inject.AbstractModule;
import com.google.inject.Key;
import com.google.inject.Module;
-import com.google.inject.TypeLiteral;
import com.google.inject.util.Modules;
import org.apache.aurora.scheduler.TierModule;
@@ -40,8 +37,6 @@ import org.apache.aurora.scheduler.mesos.FrameworkInfoFactory;
import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
-import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl;
-import org.apache.aurora.scheduler.storage.log.SnapshotStoreImpl.HydrateSnapshotFields;
import org.apache.mesos.SchedulerDriver;
import org.apache.mesos.v1.Protos;
import org.apache.shiro.io.ResourceUtils;
@@ -79,16 +74,12 @@ public final class LocalSchedulerMain {
.add("-shiro_ini_path="
+ ResourceUtils.CLASSPATH_PREFIX
+ "org/apache/aurora/scheduler/http/api/security/shiro-example.ini")
- .add("-enable_h2_console=true")
.build();
CliOptions options = CommandLine.parseOptions(arguments.toArray(new String[] {}));
Module persistentStorage = new AbstractModule() {
@Override
protected void configure() {
- bind(new TypeLiteral<Boolean>() { })
- .annotatedWith(SnapshotStoreImpl.ExperimentalTaskStore.class)
- .toInstance(false);
bind(Storage.class).to(Key.get(Storage.class, Storage.Volatile.class));
bind(NonVolatileStorage.class).to(FakeNonVolatileStorage.class);
bind(DistributedSnapshotStore.class).toInstance(snapshot -> { });
@@ -108,8 +99,6 @@ public final class LocalSchedulerMain {
bind(FrameworkInfoFactory.class).to(FrameworkInfoFactory.FrameworkInfoFactoryImpl.class);
bind(FrameworkInfoFactory.FrameworkInfoFactoryImpl.class).in(Singleton.class);
install(new ClusterSimulatorModule());
- bind(new TypeLiteral<Set<String>>() { }).annotatedWith(HydrateSnapshotFields.class)
- .toInstance(ImmutableSet.of());
}
};
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
index f7c945d..244422c 100644
--- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
@@ -29,7 +29,6 @@ import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.inject.AbstractModule;
@@ -170,17 +169,9 @@ public class CommandLineTest {
expected.updater.enableAffinity = true;
expected.updater.affinityExpiration = TEST_TIME;
expected.state.taskAssignerModules = ImmutableList.of(NoopModule.class);
- expected.db.useDbTaskStore = true;
- expected.db.enableDbMetrics = false;
- expected.db.slowQueryLogThreshold = TEST_TIME;
- expected.db.dbRowGcInterval = TEST_TIME;
- expected.db.h2LockTimeout = TEST_TIME;
- expected.db.mybatisMaxActiveConnectionCount = 42;
- expected.db.mybatisMaxIdleConnectionCount = 42;
expected.logStorage.shutdownGracePeriod = TEST_TIME;
expected.logStorage.snapshotInterval = TEST_TIME;
expected.logStorage.maxLogEntrySize = TEST_DATA;
- expected.logStorage.hydrateSnapshotFields = ImmutableSet.of("testing");
expected.backup.backupInterval = TEST_TIME;
expected.backup.maxSavedBackups = 42;
expected.backup.backupDir = new File("testing");
@@ -225,7 +216,6 @@ public class CommandLineTest {
expected.iniShiroRealm.shiroIniPath = testIni;
expected.iniShiroRealm.shiroCredentialsMatcher = AllowAllCredentialsMatcher.class;
expected.api.enableCorsFor = "testing";
- expected.h2Console.enableH2Console = true;
expected.preemptor.enablePreemptor = false;
expected.preemptor.preemptionDelay = TEST_TIME;
expected.preemptor.preemptionSlotHoldTime = TEST_TIME;
@@ -323,17 +313,9 @@ public class CommandLineTest {
"-enable_update_affinity=true",
"-update_affinity_reservation_hold_time=42days",
"-task_assigner_modules=org.apache.aurora.scheduler.config.CommandLineTest$NoopModule",
- "-use_beta_db_task_store=true",
- "-enable_db_metrics=false",
- "-slow_query_log_threshold=42days",
- "-db_row_gc_interval=42days",
- "-db_lock_timeout=42days",
- "-db_max_active_connection_count=42",
- "-db_max_idle_connection_count=42",
"-dlog_shutdown_grace_period=42days",
"-dlog_snapshot_interval=42days",
"-dlog_max_entry_size=42GB",
- "-snapshot_hydrate_stores=testing",
"-backup_interval=42days",
"-max_saved_backups=42",
"-backup_dir=testing",
@@ -366,7 +348,6 @@ public class CommandLineTest {
"-shiro_credentials_matcher="
+ "org.apache.shiro.authc.credential.AllowAllCredentialsMatcher",
"-enable_cors_for=testing",
- "-enable_h2_console=true",
"-enable_preemptor=false",
"-preemption_delay=42days",
"-preemption_slot_hold_time=42days",
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
index 2b81707..742a97c 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
@@ -33,8 +33,8 @@ import org.apache.aurora.scheduler.state.StateChangeResult;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.easymock.Capture;
import org.junit.Before;
import org.junit.Test;
@@ -65,7 +65,7 @@ public class AuroraCronJobTest extends EasyMockTest {
@Before
public void setUp() throws Exception {
- storage = DbUtil.createStorage();
+ storage = MemStorageModule.newEmptyStorage();
stateManager = createMock(StateManager.class);
backoffHelper = createMock(BackoffHelper.class);
context = createMock(JobExecutionContext.class);
@@ -101,11 +101,11 @@ public class AuroraCronJobTest extends EasyMockTest {
populateStorage(CronCollisionPolicy.CANCEL_NEW);
auroraCronJob.doExecute(context);
- storage = DbUtil.createStorage();
+ storage = MemStorageModule.newEmptyStorage();
populateStorage(CronCollisionPolicy.KILL_EXISTING);
auroraCronJob.doExecute(context);
- storage = DbUtil.createStorage();
+ storage = MemStorageModule.newEmptyStorage();
populateStorage(CronCollisionPolicy.RUN_OVERLAP);
auroraCronJob.doExecute(context);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
index 1ed6a7b..0fabb33 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
@@ -39,9 +39,9 @@ import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.junit.Before;
import org.junit.Test;
import org.quartz.JobExecutionContext;
@@ -79,7 +79,7 @@ public class CronIT extends EasyMockTest {
@Before
public void setUp() throws Exception {
stateManager = createMock(StateManager.class);
- storage = DbUtil.createStorage();
+ storage = MemStorageModule.newEmptyStorage();
auroraCronJob = createMock(AuroraCronJob.class);
injector = Guice.createInjector(
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
index 81440f5..439cf3e 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronJobManagerImplTest.java
@@ -31,9 +31,9 @@ import org.apache.aurora.scheduler.cron.CrontabEntry;
import org.apache.aurora.scheduler.cron.SanitizedCronJob;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
@@ -59,7 +59,7 @@ public class CronJobManagerImplTest extends EasyMockTest {
@Before
public void setUp() {
- storage = DbUtil.createStorage();
+ storage = MemStorageModule.newEmptyStorage();
scheduler = createMock(Scheduler.class);
cronJobManager = new CronJobManagerImpl(storage, scheduler, TimeZone.getTimeZone("GMT"));
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/http/MaintenanceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/MaintenanceTest.java b/src/test/java/org/apache/aurora/scheduler/http/MaintenanceTest.java
index f94b58b..cb037bd 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/MaintenanceTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/MaintenanceTest.java
@@ -26,9 +26,9 @@ import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.scheduler.base.TaskTestUtil;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.junit.Before;
import org.junit.Test;
@@ -44,7 +44,7 @@ public class MaintenanceTest {
@Before
public void setUp() {
- storage = DbUtil.createStorage();
+ storage = MemStorageModule.newEmptyStorage();
maintenance = new Maintenance(storage);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
index af6bdff..7a4525a 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
@@ -51,9 +51,9 @@ import org.apache.aurora.scheduler.state.PubsubTestUtil;
import org.apache.aurora.scheduler.state.TaskAssigner;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.easymock.EasyMock;
@@ -310,7 +310,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
@Test
public void testIgnoresThrottledTasks() throws Exception {
// Ensures that tasks in THROTTLED state are not considered part of the active job state.
- Storage memStorage = DbUtil.createStorage();
+ Storage memStorage = MemStorageModule.newEmptyStorage();
Injector injector = getInjector(memStorage);
scheduler = injector.getInstance(TaskScheduler.class);
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
index c56b6f9..8e19794 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/LockManagerImplTest.java
@@ -23,9 +23,9 @@ import org.apache.aurora.gen.Lock;
import org.apache.aurora.gen.LockKey;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.state.LockManager.LockException;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.ILock;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -55,7 +55,7 @@ public class LockManagerImplTest extends EasyMockTest {
UUIDGenerator tokenGenerator = createMock(UUIDGenerator.class);
expect(tokenGenerator.createNew()).andReturn(TOKEN).anyTimes();
- lockManager = new LockManagerImpl(DbUtil.createStorage(), clock, tokenGenerator);
+ lockManager = new LockManagerImpl(MemStorageModule.newEmptyStorage(), clock, tokenGenerator);
}
@Test
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
index 24176fe..0366cd6 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/StateManagerImplTest.java
@@ -51,11 +51,11 @@ import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.apache.mesos.v1.Protos.AgentID;
import org.easymock.Capture;
import org.easymock.EasyMock;
@@ -111,7 +111,7 @@ public class StateManagerImplTest extends EasyMockTest {
eventSink = createMock(EventSink.class);
rescheduleCalculator = createMock(RescheduleCalculator.class);
// TODO(William Farner): Use a mocked storage.
- storage = DbUtil.createStorage();
+ storage = MemStorageModule.newEmptyStorage();
stateManager = new StateManagerImpl(
clock,
driver,
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java b/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
index d73656d..04350fa 100644
--- a/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/stats/ResourceCounterTest.java
@@ -35,10 +35,10 @@ import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.resources.ResourceTestUtil;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.junit.Before;
import org.junit.Test;
@@ -74,7 +74,7 @@ public class ResourceCounterTest {
@Before
public void setUp() throws Exception {
- storage = DbUtil.createStorage();
+ storage = MemStorageModule.newEmptyStorage();
resourceCounter = new ResourceCounter(storage);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/f2755e1c/src/test/java/org/apache/aurora/scheduler/storage/AbstractAttributeStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/AbstractAttributeStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/AbstractAttributeStoreTest.java
new file mode 100644
index 0000000..34db54b
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/AbstractAttributeStoreTest.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage;
+
+import java.io.IOException;
+import java.util.Set;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
+import static org.junit.Assert.assertEquals;
+
+public abstract class AbstractAttributeStoreTest {
+
+ private static final String HOST_A = "hostA";
+ private static final String HOST_B = "hostB";
+ private static final String SLAVE_A = "slaveA";
+ private static final String SLAVE_B = "slaveB";
+ private static final Attribute ATTR1 = new Attribute("attr1", ImmutableSet.of("a", "b", "c"));
+ private static final Attribute ATTR2 = new Attribute("attr2", ImmutableSet.of("d", "e", "f"));
+ private static final Attribute ATTR3 = new Attribute("attr3", ImmutableSet.of("a", "d", "g"));
+ private static final IHostAttributes HOST_A_ATTRS =
+ IHostAttributes.build(new HostAttributes(HOST_A, ImmutableSet.of(ATTR1, ATTR2))
+ .setSlaveId(SLAVE_A)
+ .setAttributes(ImmutableSet.of())
+ .setMode(MaintenanceMode.NONE));
+ private static final IHostAttributes HOST_B_ATTRS =
+ IHostAttributes.build(new HostAttributes(HOST_B, ImmutableSet.of(ATTR2, ATTR3))
+ .setSlaveId(SLAVE_B)
+ .setAttributes(ImmutableSet.of())
+ .setMode(MaintenanceMode.DRAINING));
+
+ private Storage storage;
+
+ @Before
+ public void setUp() throws IOException {
+ storage = createStorage();
+ }
+
+ protected abstract Storage createStorage();
+
+ @Test
+ public void testCrud() {
+ assertEquals(Optional.absent(), read(HOST_A));
+ assertEquals(ImmutableSet.of(), readAll());
+
+ insert(HOST_A_ATTRS);
+ assertEquals(Optional.of(HOST_A_ATTRS), read(HOST_A));
+ assertEquals(ImmutableSet.of(HOST_A_ATTRS), readAll());
+
+ insert(HOST_B_ATTRS);
+ insert(HOST_B_ATTRS); // Double insert should be allowed.
+ assertEquals(Optional.of(HOST_B_ATTRS), read(HOST_B));
+ assertEquals(ImmutableSet.of(HOST_A_ATTRS, HOST_B_ATTRS), readAll());
+
+ IHostAttributes updatedA = IHostAttributes.build(
+ HOST_A_ATTRS.newBuilder().setAttributes(ImmutableSet.of(ATTR1, ATTR3)));
+ insert(updatedA);
+ assertEquals(Optional.of(updatedA), read(HOST_A));
+ assertEquals(ImmutableSet.of(updatedA, HOST_B_ATTRS), readAll());
+
+ IHostAttributes updatedMode = IHostAttributes.build(updatedA.newBuilder().setMode(DRAINED));
+ insert(updatedMode);
+ assertEquals(Optional.of(updatedMode), read(HOST_A));
+ assertEquals(ImmutableSet.of(updatedMode, HOST_B_ATTRS), readAll());
+
+ truncate();
+ assertEquals(Optional.absent(), read(HOST_A));
+ assertEquals(ImmutableSet.of(), readAll());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testEmptyAttributeValues() {
+ IHostAttributes attributes = IHostAttributes.build(HOST_A_ATTRS.newBuilder()
+ .setAttributes(ImmutableSet.of(new Attribute("attr1", ImmutableSet.of()))));
+ insert(attributes);
+ }
+
+ @Test
+ public void testNoAttributes() {
+ IHostAttributes attributes = IHostAttributes.build(
+ HOST_A_ATTRS.newBuilder().setAttributes(ImmutableSet.of()));
+ insert(attributes);
+ assertEquals(Optional.of(attributes), read(HOST_A));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNoMode() {
+ HostAttributes noMode = HOST_A_ATTRS.newBuilder();
+ noMode.unsetMode();
+
+ insert(IHostAttributes.build(noMode));
+ }
+
+ @Test
+ public void testSaveAttributesEmpty() {
+ HostAttributes attributes = HOST_A_ATTRS.newBuilder();
+ attributes.unsetAttributes();
+
+ insert(IHostAttributes.build(attributes));
+ assertEquals(Optional.of(IHostAttributes.build(attributes)), read(HOST_A));
+ }
+
+ @Test
+ public void testSlaveIdChanges() {
+ insert(HOST_A_ATTRS);
+ IHostAttributes updated = IHostAttributes.build(HOST_A_ATTRS.newBuilder().setSlaveId(SLAVE_B));
+ insert(updated);
+ assertEquals(Optional.of(updated), read(HOST_A));
+ }
+
+ @Test
+ public void testUpdateAttributesWithRelations() {
+ // Test for regression of AURORA-1379, where host attribute mutation performed a delete,
+ // violating foreign key constraints.
+ insert(HOST_A_ATTRS);
+
+ ScheduledTask builder = TaskTestUtil.makeTask("a", JobKeys.from("role", "env", "job"))
+ .newBuilder();
+ builder.getAssignedTask()
+ .setSlaveHost(HOST_A_ATTRS.getHost())
+ .setSlaveId(HOST_A_ATTRS.getSlaveId());
+ final IScheduledTask taskA = IScheduledTask.build(builder);
+
+ storage.write((NoResult.Quiet)
+ storeProvider -> storeProvider.getUnsafeTaskStore().saveTasks(ImmutableSet.of(taskA)));
+
+ HostAttributes attributeBuilder = HOST_A_ATTRS.newBuilder().setMode(DRAINED);
+ attributeBuilder.addToAttributes(new Attribute("newAttr", ImmutableSet.of("a", "b")));
+ IHostAttributes hostAUpdated = IHostAttributes.build(attributeBuilder);
+ insert(hostAUpdated);
+ assertEquals(Optional.of(hostAUpdated), read(HOST_A));
+ }
+
+ private void insert(IHostAttributes attributes) {
+ storage.write(
+ storeProvider -> storeProvider.getAttributeStore().saveHostAttributes(attributes));
+ }
+
+ private Optional<IHostAttributes> read(String host) {
+ return storage.read(storeProvider -> storeProvider.getAttributeStore().getHostAttributes(host));
+ }
+
+ private Set<IHostAttributes> readAll() {
+ return storage.read(storeProvider -> storeProvider.getAttributeStore().getHostAttributes());
+ }
+
+ private void truncate() {
+ storage.write(
+ (NoResult.Quiet) storeProvider -> storeProvider.getAttributeStore().deleteHostAttributes());
+ }
+}