You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2018/01/31 23:00:08 UTC

aurora git commit: Improve performance of MemTaskStore queries

Repository: aurora
Updated Branches:
  refs/heads/master 787ccfed5 -> 858552db0


Improve performance of MemTaskStore queries

Use `ArrayDeque` rather than `HashSet` for fetchTasks, and use imperative style
rather than functional.  I arrived at this result after running benchmarks with
some of the other usual suspects (`ArrayList`, `LinkedList`).

This patch also enables stack and heap profilers in jmh (more details
[here](http://hg.openjdk.java.net/codetools/jmh/file/25d8b2695bac/jmh-samples/src/main/java/org/openjdk/jmh/samples/JMHSample_35_Profilers.java)),
providing insight into the heap impact of changes.  I started this change with a
heap profiler as the primary motivation, and ended up using it to guide this
improvement.

Reviewed at https://reviews.apache.org/r/65303/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/858552db
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/858552db
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/858552db

Branch: refs/heads/master
Commit: 858552db0f433eca7ae615a961de82fc56717e5d
Parents: 787ccfe
Author: Bill Farner <wf...@apache.org>
Authored: Wed Jan 31 14:59:30 2018 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Jan 31 15:00:01 2018 -0800

----------------------------------------------------------------------
 build.gradle                                    |  1 +
 .../aurora/benchmark/TaskStoreBenchmarks.java   | 45 ++++++++++--
 .../scheduler/storage/mem/MemTaskStore.java     | 75 +++++++++-----------
 3 files changed, 73 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/858552db/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 64af7ae..57355dc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -602,6 +602,7 @@ jmh {
   jvmArgsPrepend = '-Xmx3g'
   humanOutputFile = project.file("$jmhHumanOutputPath")
   resultsFile = project.file("$buildDir/reports/jmh/results.txt")
+  profilers = ['gc', 'stack']
 }
 tasks.getByName('jmh').doLast() {
   println "Benchmark report generated: file://$jmhHumanOutputPath"

http://git-wip-us.apache.org/repos/asf/aurora/blob/858552db/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java
index 9ec9865..4e4d36b 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java
@@ -16,7 +16,6 @@ package org.apache.aurora.benchmark;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.Iterables;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.util.Modules;
@@ -27,6 +26,7 @@ import org.apache.aurora.common.util.testing.FakeClock;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
@@ -54,6 +54,7 @@ public class TaskStoreBenchmarks {
   @State(Scope.Thread)
   public abstract static class AbstractFetchTasksBenchmark {
     protected Storage storage;
+    protected IJobKey job;
     public abstract void setUp();
 
     @Param({"10000", "50000", "100000"})
@@ -63,6 +64,7 @@ public class TaskStoreBenchmarks {
       storage.write((Storage.MutateWork.NoResult.Quiet) storeProvider -> {
         TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
         Set<IScheduledTask> tasks = new Tasks.Builder().build(size);
+        job = tasks.stream().findFirst().get().getAssignedTask().getTask().getJob();
         taskStore.saveTasks(tasks);
       });
     }
@@ -75,7 +77,7 @@ public class TaskStoreBenchmarks {
     }
   }
 
-  public static class MemFetchTasksBenchmark extends AbstractFetchTasksBenchmark {
+  public static class FetchAll extends AbstractFetchTasksBenchmark {
     @Setup(Level.Trial)
     @Override
     public void setUp() {
@@ -105,9 +107,42 @@ public class TaskStoreBenchmarks {
 
     @Benchmark
     public int run() {
-      // Iterate through results in case the result is lazily computed.
-      return Iterables.size(
-          storage.read(store -> store.getTaskStore().fetchTasks(Query.unscoped())));
+      return storage.read(store -> store.getTaskStore().fetchTasks(Query.unscoped())).size();
+    }
+  }
+
+  public static class IndexedFetchAndFilter extends AbstractFetchTasksBenchmark {
+    @Setup(Level.Trial)
+    @Override
+    public void setUp() {
+      storage = Guice.createInjector(
+          Modules.combine(
+              new MemStorageModule(),
+              new AbstractModule() {
+                @Override
+                protected void configure() {
+                  bind(StatsProvider.class).toInstance(new FakeStatsProvider());
+                  bind(Clock.class).toInstance(new FakeClock());
+                }
+              }))
+          .getInstance(Storage.class);
+
+    }
+
+    @Setup(Level.Iteration)
+    public void setUpIteration() {
+      createTasks(numTasks);
+    }
+
+    @TearDown(Level.Iteration)
+    public void tearDownIteration() {
+      deleteTasks();
+    }
+
+    @Benchmark
+    public int run() {
+      return storage.read(
+          store -> store.getTaskStore().fetchTasks(Query.instanceScoped(job, 0))).size();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/858552db/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
index b59999c..5b10350 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
@@ -17,6 +17,9 @@ import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -28,12 +31,9 @@ import javax.inject.Qualifier;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
-import com.google.common.base.Functions;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
 import com.google.common.base.Supplier;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -130,11 +130,11 @@ class MemTaskStore implements TaskStore.Mutable {
 
   @Timed("mem_storage_fetch_tasks")
   @Override
-  public ImmutableSet<IScheduledTask> fetchTasks(Query.Builder query) {
+  public Collection<IScheduledTask> fetchTasks(Query.Builder query) {
     requireNonNull(query);
 
     long start = System.nanoTime();
-    ImmutableSet<IScheduledTask> result = matches(query).toSet();
+    Collection<IScheduledTask> result = matches(query);
     long durationNanos = System.nanoTime() - start;
     boolean infoLevel = durationNanos >= slowQueryThresholdNanos;
     long time = Amount.of(durationNanos, Time.NANOSECONDS).as(Time.MILLISECONDS);
@@ -164,9 +164,9 @@ class MemTaskStore implements TaskStore.Mutable {
         "Proposed new tasks would create task ID collision.");
 
     Iterable<Task> canonicalized = Iterables.transform(newTasks, toTask);
-    tasks.putAll(Maps.uniqueIndex(canonicalized, TO_ID));
+    tasks.putAll(Maps.uniqueIndex(canonicalized, task -> Tasks.id(task.storedTask)));
     for (SecondaryIndex<?> index : secondaryIndices) {
-      index.insert(Iterables.transform(canonicalized, TO_SCHEDULED));
+      index.insert(Iterables.transform(canonicalized, task -> task.storedTask));
     }
   }
 
@@ -220,27 +220,22 @@ class MemTaskStore implements TaskStore.Mutable {
     });
   }
 
-  private static Predicate<Task> queryFilter(Query.Builder query) {
-    return Predicates.compose(
-        Util.queryFilter(query),
-        new Function<Task, IScheduledTask>() {
-          @Override
-          public IScheduledTask apply(Task canonicalTask) {
-            return canonicalTask.storedTask;
-          }
-        });
-  }
+  private Collection<IScheduledTask> fromIdIndex(
+      Iterable<String> taskIds,
+      Predicate<IScheduledTask> filter) {
 
-  private Iterable<Task> fromIdIndex(Iterable<String> taskIds) {
-    return FluentIterable.from(taskIds)
-        .transform(Functions.forMap(tasks, null))
-        .filter(Predicates.notNull())
-        .toList();
+    Collection<IScheduledTask> result = new ArrayDeque<>();
+    for (String id : taskIds) {
+      Task match = tasks.get(id);
+      if (match != null && filter.apply(match.storedTask)) {
+        result.add(match.storedTask);
+      }
+    }
+    return result;
   }
 
-  private FluentIterable<IScheduledTask> matches(Query.Builder query) {
-    // Apply the query against the working set.
-    Optional<? extends Iterable<Task>> from = Optional.empty();
+  private Collection<IScheduledTask> matches(Query.Builder query) {
+    Predicate<IScheduledTask> filter = Util.queryFilter(query);
     if (query.get().getTaskIds().isEmpty()) {
       for (SecondaryIndex<?> index : secondaryIndices) {
         Optional<Iterable<String>> indexMatch = index.getMatches(query);
@@ -248,31 +243,25 @@ class MemTaskStore implements TaskStore.Mutable {
           // Note: we could leverage multiple indexes here if the query applies to them, by
           // choosing to intersect the results.  Given current indexes and query profile, this is
           // unlikely to offer much improvement, though.
-          from = Optional.of(fromIdIndex(indexMatch.get()));
-          break;
+          return fromIdIndex(indexMatch.get(), filter);
         }
       }
 
       // No indices match, fall back to a full scan.
-      if (!from.isPresent()) {
-        taskQueriesAll.incrementAndGet();
-        from = Optional.of(tasks.values());
+      taskQueriesAll.incrementAndGet();
+      Collection<IScheduledTask> result = new ArrayDeque<>();
+      for (Task task : tasks.values()) {
+        if (filter.test(task.storedTask)) {
+          result.add(task.storedTask);
+        }
       }
+      return Collections.unmodifiableCollection(result);
     } else {
       taskQueriesById.incrementAndGet();
-      from = Optional.of(fromIdIndex(query.get().getTaskIds()));
+      return fromIdIndex(query.get().getTaskIds(), filter);
     }
-
-    return FluentIterable.from(from.get())
-        .filter(queryFilter(query))
-        .transform(TO_SCHEDULED);
   }
 
-  private static final Function<Task, IScheduledTask> TO_SCHEDULED = task -> task.storedTask;
-
-  private static final Function<Task, String> TO_ID =
-      Functions.compose(Tasks::id, TO_SCHEDULED);
-
   private static class Task {
     private final IScheduledTask storedTask;
 
@@ -388,13 +377,13 @@ class MemTaskStore implements TaskStore.Mutable {
           @Override
           public Iterable<String> apply(Set<K> keys) {
             hitCount.incrementAndGet();
-            ImmutableSet.Builder<String> builder = ImmutableSet.builder();
+            Collection<String> matches = new ArrayDeque<>();
             synchronized (index) {
               for (K key : keys) {
-                builder.addAll(index.get(key));
+                matches.addAll(index.get(key));
               }
             }
-            return builder.build();
+            return matches;
           }
     };