You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/27 23:51:01 UTC

git commit: AURORA-122: Add an abstraction for secondary indices, introduce an index on slave host.

Updated Branches:
  refs/heads/master 3a23a501a -> 4efa6748c


AURORA-122: Add an abstraction for secondary indices, introduce an index on slave host.

Bugs closed: AURORA-122

Reviewed at https://reviews.apache.org/r/17372/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/4efa6748
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/4efa6748
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/4efa6748

Branch: refs/heads/master
Commit: 4efa6748c949d76ea1c118fc7b19bb2490398219
Parents: 3a23a50
Author: Bill Farner <wf...@apache.org>
Authored: Mon Jan 27 14:50:48 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Mon Jan 27 14:50:48 2014 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/async/Preemptor.java       |   9 +-
 .../org/apache/aurora/scheduler/base/Tasks.java |  10 +
 .../aurora/scheduler/http/Maintenance.java      |   9 +-
 .../scheduler/storage/mem/MemTaskStore.java     | 215 +++++++++++++++----
 .../scheduler/storage/mem/MemTaskStoreTest.java |  65 ++++++
 5 files changed, 245 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4efa6748/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
index f344cb7..c11f483 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -173,13 +173,6 @@ public interface Preemptor {
           }
         };
 
-    private static final Function<IAssignedTask, String> TASK_TO_HOST =
-        new Function<IAssignedTask, String>() {
-          @Override public String apply(IAssignedTask input) {
-            return input.getSlaveHost();
-          }
-        };
-
     private static Predicate<IAssignedTask> canPreempt(final IAssignedTask pending) {
       return new Predicate<IAssignedTask>() {
         @Override public boolean apply(IAssignedTask possibleVictim) {
@@ -227,7 +220,7 @@ public interface Preemptor {
       // This enforces the precondition that all of the resources are from the same host. We need to
       // get the host for the schedulingFilter.
       Set<String> hosts = ImmutableSet.<String>builder()
-          .addAll(Iterables.transform(possibleVictims, TASK_TO_HOST))
+          .addAll(Iterables.transform(possibleVictims, Tasks.ASSIGNED_TO_SLAVE_HOST))
           .addAll(Iterables.transform(offers, OFFER_TO_HOST)).build();
 
       String host = Iterables.getOnlyElement(hosts);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4efa6748/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
index 06a19d8..2216b20 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
@@ -94,6 +94,16 @@ public final class Tasks {
   public static final Function<IScheduledTask, IJobKey> SCHEDULED_TO_JOB_KEY =
       Functions.compose(ASSIGNED_TO_JOB_KEY, SCHEDULED_TO_ASSIGNED);
 
+  public static final Function<IAssignedTask, String> ASSIGNED_TO_SLAVE_HOST =
+      new Function<IAssignedTask, String>() {
+        @Override public String apply(IAssignedTask task) {
+          return task.getSlaveHost();
+        }
+      };
+
+  public static final Function<IScheduledTask, String> SCHEDULED_TO_SLAVE_HOST =
+      Functions.compose(ASSIGNED_TO_SLAVE_HOST, SCHEDULED_TO_ASSIGNED);
+
   /**
    * Different states that an active task may be in.
    */

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4efa6748/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java b/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
index 5ae63e0..fbba9f8 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Maintenance.java
@@ -81,17 +81,10 @@ public class Maintenance {
       drainingTasks.addAll(provider.getTaskStore().fetchTasks(Query.slaveScoped(host).active()));
     }
     return Multimaps.transformValues(
-        Multimaps.index(drainingTasks.build(), TASK_TO_HOST),
+        Multimaps.index(drainingTasks.build(), Tasks.SCHEDULED_TO_SLAVE_HOST),
         Tasks.SCHEDULED_TO_ID);
   }
 
-  private static final Function<IScheduledTask, String> TASK_TO_HOST =
-      new Function<IScheduledTask, String>() {
-        @Override public String apply(IScheduledTask task) {
-          return task.getAssignedTask().getSlaveHost();
-        }
-      };
-
   private static final Function<HostAttributes, String> HOST_NAME =
       new Function<HostAttributes, String>() {
         @Override public String apply(HostAttributes attributes) {

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4efa6748/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
index f418c6c..eaf18dc 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
@@ -15,7 +15,7 @@
  */
 package org.apache.aurora.scheduler.storage.mem;
 
-import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
@@ -27,6 +27,7 @@ import com.google.common.base.Functions;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
@@ -71,17 +72,44 @@ class MemTaskStore implements TaskStore.Mutable {
 
   private final long slowQueryThresholdNanos = SLOW_QUERY_LOG_THRESHOLD.get().as(Time.NANOSECONDS);
 
+  private static final Function<Query.Builder, Optional<IJobKey>> QUERY_TO_JOB_KEY =
+      new Function<Query.Builder, Optional<IJobKey>>() {
+        @Override public Optional<IJobKey> apply(Query.Builder query) {
+          return JobKeys.from(query);
+        }
+      };
+  private static final Function<Query.Builder, Optional<String>> QUERY_TO_SLAVE_HOST =
+      new Function<Query.Builder, Optional<String>>() {
+        @Override public Optional<String> apply(Query.Builder query) {
+          return Optional.fromNullable(query.get().getSlaveHost());
+        }
+      };
+
+  // Since this class operates under the API and umbrella of {@link Storage}, it is expected to be
+  // thread-safe but not necessarily strongly-consistent unless the externally-controlled storage
+  // lock is secured.  To adhere to that, these data structures are individually thread-safe, but
+  // we don't lock across them because of the relaxed consistency guarantees.
+  // For this reason, the secondary indices store references to Task objects (as opposed to storing
+  // secondary to primary key mappings).  This ensures that in the face of weak consistency, query
+  // results are sane.  Otherwise, you could query for seconary key = v1 and get a result with
+  // secondary key value = v2.
   private final Map<String, Task> tasks = Maps.newConcurrentMap();
-  private final Multimap<IJobKey, String> tasksByJobKey =
-      Multimaps.synchronizedSetMultimap(HashMultimap.<IJobKey, String>create());
+  private final List<SecondaryIndex<?>> secondaryIndices = ImmutableList.of(
+      new SecondaryIndex<>(
+          Functions.compose(Tasks.SCHEDULED_TO_JOB_KEY, TO_SCHEDULED),
+          QUERY_TO_JOB_KEY,
+          Stats.exportLong("task_queries_by_job")),
+      new SecondaryIndex<>(
+          Functions.compose(Tasks.SCHEDULED_TO_SLAVE_HOST, TO_SCHEDULED),
+          QUERY_TO_SLAVE_HOST,
+          Stats.exportLong("task_queries_by_host")));
 
   // An interner is used here to collapse equivalent TaskConfig instances into canonical instances.
   // Ideally this would fall out of the object hierarchy (TaskConfig being associated with the job
   // rather than the task), but we intuit this detail here for performance reasons.
-  private final Interner<TaskConfig, String> configInterner = new Interner<TaskConfig, String>();
+  private final Interner<TaskConfig, String> configInterner = new Interner<>();
 
   private final AtomicLong taskQueriesById = Stats.exportLong("task_queries_by_id");
-  private final AtomicLong taskQueriesByJob = Stats.exportLong("task_queries_by_job");
   private final AtomicLong taskQueriesAll = Stats.exportLong("task_queries_all");
 
   @Timed("mem_storage_fetch_tasks")
@@ -90,7 +118,7 @@ class MemTaskStore implements TaskStore.Mutable {
     checkNotNull(query);
 
     long start = System.nanoTime();
-    ImmutableSet<IScheduledTask> result = matches(query.get()).toSet();
+    ImmutableSet<IScheduledTask> result = matches(query).transform(TO_SCHEDULED).toSet();
     long durationNanos = System.nanoTime() - start;
     Level level = (durationNanos >= slowQueryThresholdNanos) ? Level.INFO : Level.FINE;
     if (LOG.isLoggable(level)) {
@@ -117,20 +145,18 @@ class MemTaskStore implements TaskStore.Mutable {
 
     Iterable<Task> canonicalized = Iterables.transform(newTasks, toTask);
     tasks.putAll(Maps.uniqueIndex(canonicalized, TO_ID));
-    tasksByJobKey.putAll(taskIdsByJobKey(canonicalized));
-  }
-
-  private Multimap<IJobKey, String> taskIdsByJobKey(Iterable<Task> toIndex) {
-    return Multimaps.transformValues(
-        Multimaps.index(toIndex, Functions.compose(Tasks.SCHEDULED_TO_JOB_KEY, TO_SCHEDULED)),
-        TO_ID);
+    for (SecondaryIndex<?> index : secondaryIndices) {
+      index.insert(canonicalized);
+    }
   }
 
   @Timed("mem_storage_delete_all_tasks")
   @Override
   public void deleteAllTasks() {
     tasks.clear();
-    tasksByJobKey.clear();
+    for (SecondaryIndex<?> index : secondaryIndices) {
+      index.clear();
+    }
     configInterner.clear();
   }
 
@@ -142,7 +168,9 @@ class MemTaskStore implements TaskStore.Mutable {
     for (String id : taskIds) {
       Task removed = tasks.remove(id);
       if (removed != null) {
-        tasksByJobKey.remove(Tasks.SCHEDULED_TO_JOB_KEY.apply(removed.task), id);
+        for (SecondaryIndex<?> index : secondaryIndices) {
+          index.remove(removed);
+        }
         configInterner.removeAssociation(removed.task.getAssignedTask().getTask().newBuilder(), id);
       }
     }
@@ -158,13 +186,18 @@ class MemTaskStore implements TaskStore.Mutable {
     checkNotNull(mutator);
 
     ImmutableSet.Builder<IScheduledTask> mutated = ImmutableSet.builder();
-    for (IScheduledTask original : matches(query.get())) {
-      IScheduledTask maybeMutated = mutator.apply(original);
-      if (!original.equals(maybeMutated)) {
+    for (Task original : matches(query).toList()) {
+      IScheduledTask maybeMutated = mutator.apply(original.task);
+      if (!original.task.equals(maybeMutated)) {
         Preconditions.checkState(
-            Tasks.id(original).equals(Tasks.id(maybeMutated)),
+            Tasks.id(original.task).equals(Tasks.id(maybeMutated)),
             "A task's ID may not be mutated.");
-        tasks.put(Tasks.id(maybeMutated), toTask.apply(maybeMutated));
+        Task newCanonicalTask = toTask.apply(maybeMutated);
+        tasks.put(Tasks.id(maybeMutated), newCanonicalTask);
+        for (SecondaryIndex<?> index : secondaryIndices) {
+          index.replace(original, newCanonicalTask);
+        }
+
         mutated.add(maybeMutated);
       }
     }
@@ -189,9 +222,10 @@ class MemTaskStore implements TaskStore.Mutable {
     }
   }
 
-  private static Predicate<IScheduledTask> queryFilter(final TaskQuery query) {
-    return new Predicate<IScheduledTask>() {
-      @Override public boolean apply(IScheduledTask task) {
+  private static Predicate<Task> queryFilter(final TaskQuery query) {
+    return new Predicate<Task>() {
+      @Override public boolean apply(Task canonicalTask) {
+        IScheduledTask task = canonicalTask.task;
         ITaskConfig config = task.getAssignedTask().getTask();
         if (query.getOwner() != null) {
           if (!StringUtils.isBlank(query.getOwner().getRole())) {
@@ -244,37 +278,37 @@ class MemTaskStore implements TaskStore.Mutable {
   }
 
   private Iterable<Task> fromIdIndex(Iterable<String> taskIds) {
-    ImmutableList.Builder<Task> matches = ImmutableList.builder();
-    for (String id : taskIds) {
-      Task match = tasks.get(id);
-      if (match != null) {
-        matches.add(match);
-      }
-    }
-    return matches.build();
+    return FluentIterable.from(taskIds)
+        .transform(Functions.forMap(tasks, null))
+        .filter(Predicates.notNull())
+        .toList();
   }
 
-  private FluentIterable<IScheduledTask> matches(TaskQuery query) {
+  private FluentIterable<Task> matches(Query.Builder query) {
     // Apply the query against the working set.
-    Iterable<Task> from;
-    Optional<IJobKey> jobKey = JobKeys.from(Query.arbitrary(query));
-    if (query.isSetTaskIds()) {
+    Optional<? extends Iterable<Task>> from = Optional.absent();
+    if (query.get().isSetTaskIds()) {
       taskQueriesById.incrementAndGet();
-      from = fromIdIndex(query.getTaskIds());
-    } else if (jobKey.isPresent()) {
-      taskQueriesByJob.incrementAndGet();
-      Collection<String> taskIds = tasksByJobKey.get(jobKey.get());
-      if (taskIds == null) {
-        from = ImmutableList.of();
-      } else {
-        from = fromIdIndex(taskIds);
-      }
+      from = Optional.of(fromIdIndex(query.get().getTaskIds()));
     } else {
-      taskQueriesAll.incrementAndGet();
-      from = tasks.values();
+      for (SecondaryIndex<?> index : secondaryIndices) {
+        from = index.getMatches(query);
+        if (from.isPresent()) {
+          // Note: we could leverage multiple indexes here if the query applies to them, by
+          // choosing to intersect the results.  Given current indexes and query profile, this is
+          // unlikely to offer much improvement, though.
+          break;
+        }
+      }
+
+      // No indices match, fall back to a full scan.
+      if (!from.isPresent()) {
+        taskQueriesAll.incrementAndGet();
+        from = Optional.of(tasks.values());
+      }
     }
 
-    return FluentIterable.from(from).transform(TO_SCHEDULED).filter(queryFilter(query));
+    return FluentIterable.from(from.get()).filter(queryFilter(query.get()));
   }
 
   private static final Function<Task, IScheduledTask> TO_SCHEDULED =
@@ -299,5 +333,92 @@ class MemTaskStore implements TaskStore.Mutable {
       builder.getAssignedTask().setTask(canonical);
       this.task = IScheduledTask.build(builder);
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof Task)) {
+        return false;
+      }
+
+      Task other = (Task) o;
+      return task.equals(other.task);
+    }
+
+    @Override
+    public int hashCode() {
+      return task.hashCode();
+    }
+  }
+
+  /**
+   * A non-unique secondary index on the task store.  Maps a custom key type to a set of task IDs.
+   *
+   * @param <K> Key type.
+   */
+  private static class SecondaryIndex<K> {
+    private final Multimap<K, Task> index =
+        Multimaps.synchronizedSetMultimap(HashMultimap.<K, Task>create());
+    private final Function<Task, K> indexer;
+    private final Function<Query.Builder, Optional<K>> queryExtractor;
+    private final AtomicLong hitCount;
+
+    /**
+     * Creates a secondary index that will extract keys from tasks using the provided indexer.
+     *
+     * @param indexer Indexing function.
+     * @param queryExtractor Function to extract the key relevant to a query.
+     * @param hitCount Counter for number of times the seconary index applies to a query.
+     */
+    SecondaryIndex(
+        Function<Task, K> indexer,
+        Function<Query.Builder, Optional<K>> queryExtractor,
+        AtomicLong hitCount) {
+
+      this.indexer = indexer;
+      this.queryExtractor = queryExtractor;
+      this.hitCount = hitCount;
+    }
+
+    void insert(Iterable<Task> tasks) {
+      for (Task task : tasks) {
+        insert(task);
+      }
+    }
+
+    void insert(Task task) {
+      K key = indexer.apply(task);
+      if (key != null) {
+        index.put(key, task);
+      }
+    }
+
+    void clear() {
+      index.clear();
+    }
+
+    void remove(Task task) {
+      K key = indexer.apply(task);
+      if (key != null) {
+        index.remove(key, task);
+      }
+    }
+
+    void replace(Task old, Task replacement) {
+      synchronized (index) {
+        remove(old);
+        insert(replacement);
+      }
+    }
+
+    private final Function<K, Iterable<Task>> lookup = new Function<K, Iterable<Task>>() {
+      @Override public Iterable<Task> apply(K key) {
+        hitCount.incrementAndGet();
+        return index.get(key);
+      }
+    };
+
+    Optional<Iterable<Task>> getMatches(Query.Builder query) {
+      return queryExtractor.apply(query).transform(lookup);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/4efa6748/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
index a1c535f..ff5bf89 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/mem/MemTaskStoreTest.java
@@ -18,6 +18,8 @@ package org.apache.aurora.scheduler.storage.mem;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -242,6 +244,69 @@ public class MemTaskStoreTest {
         identityMap);
   }
 
+  private static IScheduledTask setHost(IScheduledTask task, Optional<String> host) {
+    ScheduledTask builder = task.newBuilder();
+    builder.getAssignedTask().setSlaveHost(host.orNull());
+    return IScheduledTask.build(builder);
+  }
+
+  @Test
+  public void testAddSlaveHost() {
+    final IScheduledTask a = makeTask("a", "role", "env", "job");
+    store.saveTasks(ImmutableSet.of(a));
+    String host = "slaveA";
+    assertQueryResults(Query.slaveScoped(host));
+
+    final IScheduledTask b = setHost(a, Optional.of(host));
+    Set<IScheduledTask> result = store.mutateTasks(Query.taskScoped(Tasks.id(a)),
+        new Function<IScheduledTask, IScheduledTask>() {
+          @Override public IScheduledTask apply(IScheduledTask task) {
+            assertEquals(a, task);
+            return b;
+          }
+        });
+    assertEquals(ImmutableSet.of(b), result);
+    assertQueryResults(Query.slaveScoped(host), b);
+
+    // Unrealistic behavior, but proving that the secondary index can handle key mutations.
+    String host2 = "slaveA2";
+    final IScheduledTask c = setHost(b, Optional.of(host2));
+    Set<IScheduledTask> result2 = store.mutateTasks(Query.taskScoped(Tasks.id(a)),
+        new Function<IScheduledTask, IScheduledTask>() {
+          @Override public IScheduledTask apply(IScheduledTask task) {
+            assertEquals(b, task);
+            return c;
+          }
+        });
+    assertEquals(ImmutableSet.of(c), result2);
+    assertQueryResults(Query.slaveScoped(host2), c);
+
+    store.deleteTasks(ImmutableSet.of(Tasks.id(a)));
+    assertQueryResults(Query.slaveScoped(host));
+  }
+
+  @Test
+  public void testUnsetSlaveHost() {
+    // Unrealistic behavior, but proving that the secondary index does not become stale.
+
+    String host = "slaveA";
+    final IScheduledTask a = setHost(makeTask("a", "role", "env", "job"), Optional.of(host));
+    store.saveTasks(ImmutableSet.of(a));
+    assertQueryResults(Query.slaveScoped(host), a);
+
+    final IScheduledTask b = setHost(a, Optional.<String>absent());
+    Set<IScheduledTask> result = store.mutateTasks(Query.taskScoped(Tasks.id(a)),
+        new Function<IScheduledTask, IScheduledTask>() {
+          @Override public IScheduledTask apply(IScheduledTask task) {
+            assertEquals(a, task);
+            return b;
+          }
+        });
+    assertEquals(ImmutableSet.of(b), result);
+    assertQueryResults(Query.slaveScoped(host));
+    assertQueryResults(Query.taskScoped(Tasks.id(b)), b);
+  }
+
   private void assertStoreContents(IScheduledTask... tasks) {
     assertQueryResults(Query.unscoped(), tasks);
   }