You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/27 23:51:01 UTC
git commit: AURORA-122: Add an abstraction for secondary indices,
introduce an index on slave host.
Updated Branches:
refs/heads/master 3a23a501a -> 4efa6748c
AURORA-122: Add an abstraction for secondary indices, introduce an index on slave host.
Bugs closed: AURORA-122
Reviewed at https://reviews.apache.org/r/17372/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/4efa6748
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/4efa6748
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/4efa6748
Branch: refs/heads/master
Commit: 4efa6748c949d76ea1c118fc7b19bb2490398219
Parents: 3a23a50
Author: Bill Farner <wf...@apache.org>
Authored: Mon Jan 27 14:50:48 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Mon Jan 27 14:50:48 2014 -0800
----------------------------------------------------------------------
.../aurora/scheduler/async/Preemptor.java | 9 +-
.../org/apache/aurora/scheduler/base/Tasks.java | 10 +
.../aurora/scheduler/http/Maintenance.java | 9 +-
.../scheduler/storage/mem/MemTaskStore.java | 215 +++++++++++++++----
.../scheduler/storage/mem/MemTaskStoreTest.java | 65 ++++++
5 files changed, 245 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4efa6748/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
index f344cb7..c11f483 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -173,13 +173,6 @@ public interface Preemptor {
}
};
- private static final Function<IAssignedTask, String> TASK_TO_HOST =
- new Function<IAssignedTask, String>() {
- @Override public String apply(IAssignedTask input) {
- return input.getSlaveHost();
- }
- };
-
private static Predicate<IAssignedTask> canPreempt(final IAssignedTask pending) {
return new Predicate<IAssignedTask>() {
@Override public boolean apply(IAssignedTask possibleVictim) {
@@ -227,7 +220,7 @@ public interface Preemptor {
// This enforces the precondition that all of the resources are from the same host. We need to
// get the host for the schedulingFilter.
Set<String> hosts = ImmutableSet.<String>builder()
- .addAll(Iterables.transform(possibleVictims, TASK_TO_HOST))
+ .addAll(Iterables.transform(possibleVictims, Tasks.ASSIGNED_TO_SLAVE_HOST))
.addAll(Iterables.transform(offers, OFFER_TO_HOST)).build();
String host = Iterables.getOnlyElement(hosts);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4efa6748/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
index 06a19d8..2216b20 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
@@ -94,6 +94,16 @@ public final class Tasks {
public static final Function<IScheduledTask, IJobKey> SCHEDULED_TO_JOB_KEY =
Functions.compose(ASSIGNED_TO_JOB_KEY, SCHEDULED_TO_ASSIGNED);
+ public static final Function<IAssignedTask, String> ASSIGNED_TO_SLAVE_HOST =
+ new Function<IAssignedTask, String>() {
+ @Override public String apply(IAssignedTask task) {
+ return task.getSlaveHost();
+ }
+ };
+
+ public static final Function<IScheduledTask, String> SCHEDULED_TO_SLAVE_HOST =
+ Functions.compose(ASSIGNED_TO_SLAVE_HOST, SCHEDULED_TO_ASSIGNED);
+
/**
* Different states that an active task may be in.
*/
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4efa6748/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java b/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
index 5ae63e0..fbba9f8 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
@@ -81,17 +81,10 @@ public class Maintenance {
drainingTasks.addAll(provider.getTaskStore().fetchTasks(Query.slaveScoped(host).active()));
}
return Multimaps.transformValues(
- Multimaps.index(drainingTasks.build(), TASK_TO_HOST),
+ Multimaps.index(drainingTasks.build(), Tasks.SCHEDULED_TO_SLAVE_HOST),
Tasks.SCHEDULED_TO_ID);
}
- private static final Function<IScheduledTask, String> TASK_TO_HOST =
- new Function<IScheduledTask, String>() {
- @Override public String apply(IScheduledTask task) {
- return task.getAssignedTask().getSlaveHost();
- }
- };
-
private static final Function<HostAttributes, String> HOST_NAME =
new Function<HostAttributes, String>() {
@Override public String apply(HostAttributes attributes) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4efa6748/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
index f418c6c..eaf18dc 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
@@ -15,7 +15,7 @@
*/
package org.apache.aurora.scheduler.storage.mem;
-import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -27,6 +27,7 @@ import com.google.common.base.Functions;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
@@ -71,17 +72,44 @@ class MemTaskStore implements TaskStore.Mutable {
private final long slowQueryThresholdNanos = SLOW_QUERY_LOG_THRESHOLD.get().as(Time.NANOSECONDS);
+ private static final Function<Query.Builder, Optional<IJobKey>> QUERY_TO_JOB_KEY =
+ new Function<Query.Builder, Optional<IJobKey>>() {
+ @Override public Optional<IJobKey> apply(Query.Builder query) {
+ return JobKeys.from(query);
+ }
+ };
+ private static final Function<Query.Builder, Optional<String>> QUERY_TO_SLAVE_HOST =
+ new Function<Query.Builder, Optional<String>>() {
+ @Override public Optional<String> apply(Query.Builder query) {
+ return Optional.fromNullable(query.get().getSlaveHost());
+ }
+ };
+
+ // Since this class operates under the API and umbrella of {@link Storage}, it is expected to be
+ // thread-safe but not necessarily strongly-consistent unless the externally-controlled storage
+ // lock is secured. To adhere to that, these data structures are individually thread-safe, but
+ // we don't lock across them because of the relaxed consistency guarantees.
+ // For this reason, the secondary indices store references to Task objects (as opposed to storing
+ // secondary to primary key mappings). This ensures that in the face of weak consistency, query
+ // results are sane. Otherwise, you could query for seconary key = v1 and get a result with
+ // secondary key value = v2.
private final Map<String, Task> tasks = Maps.newConcurrentMap();
- private final Multimap<IJobKey, String> tasksByJobKey =
- Multimaps.synchronizedSetMultimap(HashMultimap.<IJobKey, String>create());
+ private final List<SecondaryIndex<?>> secondaryIndices = ImmutableList.of(
+ new SecondaryIndex<>(
+ Functions.compose(Tasks.SCHEDULED_TO_JOB_KEY, TO_SCHEDULED),
+ QUERY_TO_JOB_KEY,
+ Stats.exportLong("task_queries_by_job")),
+ new SecondaryIndex<>(
+ Functions.compose(Tasks.SCHEDULED_TO_SLAVE_HOST, TO_SCHEDULED),
+ QUERY_TO_SLAVE_HOST,
+ Stats.exportLong("task_queries_by_host")));
// An interner is used here to collapse equivalent TaskConfig instances into canonical instances.
// Ideally this would fall out of the object hierarchy (TaskConfig being associated with the job
// rather than the task), but we intuit this detail here for performance reasons.
- private final Interner<TaskConfig, String> configInterner = new Interner<TaskConfig, String>();
+ private final Interner<TaskConfig, String> configInterner = new Interner<>();
private final AtomicLong taskQueriesById = Stats.exportLong("task_queries_by_id");
- private final AtomicLong taskQueriesByJob = Stats.exportLong("task_queries_by_job");
private final AtomicLong taskQueriesAll = Stats.exportLong("task_queries_all");
@Timed("mem_storage_fetch_tasks")
@@ -90,7 +118,7 @@ class MemTaskStore implements TaskStore.Mutable {
checkNotNull(query);
long start = System.nanoTime();
- ImmutableSet<IScheduledTask> result = matches(query.get()).toSet();
+ ImmutableSet<IScheduledTask> result = matches(query).transform(TO_SCHEDULED).toSet();
long durationNanos = System.nanoTime() - start;
Level level = (durationNanos >= slowQueryThresholdNanos) ? Level.INFO : Level.FINE;
if (LOG.isLoggable(level)) {
@@ -117,20 +145,18 @@ class MemTaskStore implements TaskStore.Mutable {
Iterable<Task> canonicalized = Iterables.transform(newTasks, toTask);
tasks.putAll(Maps.uniqueIndex(canonicalized, TO_ID));
- tasksByJobKey.putAll(taskIdsByJobKey(canonicalized));
- }
-
- private Multimap<IJobKey, String> taskIdsByJobKey(Iterable<Task> toIndex) {
- return Multimaps.transformValues(
- Multimaps.index(toIndex, Functions.compose(Tasks.SCHEDULED_TO_JOB_KEY, TO_SCHEDULED)),
- TO_ID);
+ for (SecondaryIndex<?> index : secondaryIndices) {
+ index.insert(canonicalized);
+ }
}
@Timed("mem_storage_delete_all_tasks")
@Override
public void deleteAllTasks() {
tasks.clear();
- tasksByJobKey.clear();
+ for (SecondaryIndex<?> index : secondaryIndices) {
+ index.clear();
+ }
configInterner.clear();
}
@@ -142,7 +168,9 @@ class MemTaskStore implements TaskStore.Mutable {
for (String id : taskIds) {
Task removed = tasks.remove(id);
if (removed != null) {
- tasksByJobKey.remove(Tasks.SCHEDULED_TO_JOB_KEY.apply(removed.task), id);
+ for (SecondaryIndex<?> index : secondaryIndices) {
+ index.remove(removed);
+ }
configInterner.removeAssociation(removed.task.getAssignedTask().getTask().newBuilder(), id);
}
}
@@ -158,13 +186,18 @@ class MemTaskStore implements TaskStore.Mutable {
checkNotNull(mutator);
ImmutableSet.Builder<IScheduledTask> mutated = ImmutableSet.builder();
- for (IScheduledTask original : matches(query.get())) {
- IScheduledTask maybeMutated = mutator.apply(original);
- if (!original.equals(maybeMutated)) {
+ for (Task original : matches(query).toList()) {
+ IScheduledTask maybeMutated = mutator.apply(original.task);
+ if (!original.task.equals(maybeMutated)) {
Preconditions.checkState(
- Tasks.id(original).equals(Tasks.id(maybeMutated)),
+ Tasks.id(original.task).equals(Tasks.id(maybeMutated)),
"A task's ID may not be mutated.");
- tasks.put(Tasks.id(maybeMutated), toTask.apply(maybeMutated));
+ Task newCanonicalTask = toTask.apply(maybeMutated);
+ tasks.put(Tasks.id(maybeMutated), newCanonicalTask);
+ for (SecondaryIndex<?> index : secondaryIndices) {
+ index.replace(original, newCanonicalTask);
+ }
+
mutated.add(maybeMutated);
}
}
@@ -189,9 +222,10 @@ class MemTaskStore implements TaskStore.Mutable {
}
}
- private static Predicate<IScheduledTask> queryFilter(final TaskQuery query) {
- return new Predicate<IScheduledTask>() {
- @Override public boolean apply(IScheduledTask task) {
+ private static Predicate<Task> queryFilter(final TaskQuery query) {
+ return new Predicate<Task>() {
+ @Override public boolean apply(Task canonicalTask) {
+ IScheduledTask task = canonicalTask.task;
ITaskConfig config = task.getAssignedTask().getTask();
if (query.getOwner() != null) {
if (!StringUtils.isBlank(query.getOwner().getRole())) {
@@ -244,37 +278,37 @@ class MemTaskStore implements TaskStore.Mutable {
}
private Iterable<Task> fromIdIndex(Iterable<String> taskIds) {
- ImmutableList.Builder<Task> matches = ImmutableList.builder();
- for (String id : taskIds) {
- Task match = tasks.get(id);
- if (match != null) {
- matches.add(match);
- }
- }
- return matches.build();
+ return FluentIterable.from(taskIds)
+ .transform(Functions.forMap(tasks, null))
+ .filter(Predicates.notNull())
+ .toList();
}
- private FluentIterable<IScheduledTask> matches(TaskQuery query) {
+ private FluentIterable<Task> matches(Query.Builder query) {
// Apply the query against the working set.
- Iterable<Task> from;
- Optional<IJobKey> jobKey = JobKeys.from(Query.arbitrary(query));
- if (query.isSetTaskIds()) {
+ Optional<? extends Iterable<Task>> from = Optional.absent();
+ if (query.get().isSetTaskIds()) {
taskQueriesById.incrementAndGet();
- from = fromIdIndex(query.getTaskIds());
- } else if (jobKey.isPresent()) {
- taskQueriesByJob.incrementAndGet();
- Collection<String> taskIds = tasksByJobKey.get(jobKey.get());
- if (taskIds == null) {
- from = ImmutableList.of();
- } else {
- from = fromIdIndex(taskIds);
- }
+ from = Optional.of(fromIdIndex(query.get().getTaskIds()));
} else {
- taskQueriesAll.incrementAndGet();
- from = tasks.values();
+ for (SecondaryIndex<?> index : secondaryIndices) {
+ from = index.getMatches(query);
+ if (from.isPresent()) {
+ // Note: we could leverage multiple indexes here if the query applies to them, by
+ // choosing to intersect the results. Given current indexes and query profile, this is
+ // unlikely to offer much improvement, though.
+ break;
+ }
+ }
+
+ // No indices match, fall back to a full scan.
+ if (!from.isPresent()) {
+ taskQueriesAll.incrementAndGet();
+ from = Optional.of(tasks.values());
+ }
}
- return FluentIterable.from(from).transform(TO_SCHEDULED).filter(queryFilter(query));
+ return FluentIterable.from(from.get()).filter(queryFilter(query.get()));
}
private static final Function<Task, IScheduledTask> TO_SCHEDULED =
@@ -299,5 +333,92 @@ class MemTaskStore implements TaskStore.Mutable {
builder.getAssignedTask().setTask(canonical);
this.task = IScheduledTask.build(builder);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof Task)) {
+ return false;
+ }
+
+ Task other = (Task) o;
+ return task.equals(other.task);
+ }
+
+ @Override
+ public int hashCode() {
+ return task.hashCode();
+ }
+ }
+
+ /**
+ * A non-unique secondary index on the task store. Maps a custom key type to a set of task IDs.
+ *
+ * @param <K> Key type.
+ */
+ private static class SecondaryIndex<K> {
+ private final Multimap<K, Task> index =
+ Multimaps.synchronizedSetMultimap(HashMultimap.<K, Task>create());
+ private final Function<Task, K> indexer;
+ private final Function<Query.Builder, Optional<K>> queryExtractor;
+ private final AtomicLong hitCount;
+
+ /**
+ * Creates a secondary index that will extract keys from tasks using the provided indexer.
+ *
+ * @param indexer Indexing function.
+ * @param queryExtractor Function to extract the key relevant to a query.
+ * @param hitCount Counter for number of times the seconary index applies to a query.
+ */
+ SecondaryIndex(
+ Function<Task, K> indexer,
+ Function<Query.Builder, Optional<K>> queryExtractor,
+ AtomicLong hitCount) {
+
+ this.indexer = indexer;
+ this.queryExtractor = queryExtractor;
+ this.hitCount = hitCount;
+ }
+
+ void insert(Iterable<Task> tasks) {
+ for (Task task : tasks) {
+ insert(task);
+ }
+ }
+
+ void insert(Task task) {
+ K key = indexer.apply(task);
+ if (key != null) {
+ index.put(key, task);
+ }
+ }
+
+ void clear() {
+ index.clear();
+ }
+
+ void remove(Task task) {
+ K key = indexer.apply(task);
+ if (key != null) {
+ index.remove(key, task);
+ }
+ }
+
+ void replace(Task old, Task replacement) {
+ synchronized (index) {
+ remove(old);
+ insert(replacement);
+ }
+ }
+
+ private final Function<K, Iterable<Task>> lookup = new Function<K, Iterable<Task>>() {
+ @Override public Iterable<Task> apply(K key) {
+ hitCount.incrementAndGet();
+ return index.get(key);
+ }
+ };
+
+ Optional<Iterable<Task>> getMatches(Query.Builder query) {
+ return queryExtractor.apply(query).transform(lookup);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4efa6748/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
index a1c535f..ff5bf89 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
@@ -18,6 +18,8 @@ package org.apache.aurora.scheduler.storage.mem;
import java.util.Map;
import java.util.Set;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -242,6 +244,69 @@ public class MemTaskStoreTest {
identityMap);
}
+ private static IScheduledTask setHost(IScheduledTask task, Optional<String> host) {
+ ScheduledTask builder = task.newBuilder();
+ builder.getAssignedTask().setSlaveHost(host.orNull());
+ return IScheduledTask.build(builder);
+ }
+
+ @Test
+ public void testAddSlaveHost() {
+ final IScheduledTask a = makeTask("a", "role", "env", "job");
+ store.saveTasks(ImmutableSet.of(a));
+ String host = "slaveA";
+ assertQueryResults(Query.slaveScoped(host));
+
+ final IScheduledTask b = setHost(a, Optional.of(host));
+ Set<IScheduledTask> result = store.mutateTasks(Query.taskScoped(Tasks.id(a)),
+ new Function<IScheduledTask, IScheduledTask>() {
+ @Override public IScheduledTask apply(IScheduledTask task) {
+ assertEquals(a, task);
+ return b;
+ }
+ });
+ assertEquals(ImmutableSet.of(b), result);
+ assertQueryResults(Query.slaveScoped(host), b);
+
+ // Unrealistic behavior, but proving that the secondary index can handle key mutations.
+ String host2 = "slaveA2";
+ final IScheduledTask c = setHost(b, Optional.of(host2));
+ Set<IScheduledTask> result2 = store.mutateTasks(Query.taskScoped(Tasks.id(a)),
+ new Function<IScheduledTask, IScheduledTask>() {
+ @Override public IScheduledTask apply(IScheduledTask task) {
+ assertEquals(b, task);
+ return c;
+ }
+ });
+ assertEquals(ImmutableSet.of(c), result2);
+ assertQueryResults(Query.slaveScoped(host2), c);
+
+ store.deleteTasks(ImmutableSet.of(Tasks.id(a)));
+ assertQueryResults(Query.slaveScoped(host));
+ }
+
+ @Test
+ public void testUnsetSlaveHost() {
+ // Unrealistic behavior, but proving that the secondary index does not become stale.
+
+ String host = "slaveA";
+ final IScheduledTask a = setHost(makeTask("a", "role", "env", "job"), Optional.of(host));
+ store.saveTasks(ImmutableSet.of(a));
+ assertQueryResults(Query.slaveScoped(host), a);
+
+ final IScheduledTask b = setHost(a, Optional.<String>absent());
+ Set<IScheduledTask> result = store.mutateTasks(Query.taskScoped(Tasks.id(a)),
+ new Function<IScheduledTask, IScheduledTask>() {
+ @Override public IScheduledTask apply(IScheduledTask task) {
+ assertEquals(a, task);
+ return b;
+ }
+ });
+ assertEquals(ImmutableSet.of(b), result);
+ assertQueryResults(Query.slaveScoped(host));
+ assertQueryResults(Query.taskScoped(Tasks.id(b)), b);
+ }
+
private void assertStoreContents(IScheduledTask... tasks) {
assertQueryResults(Query.unscoped(), tasks);
}