You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2018/01/31 23:00:08 UTC
aurora git commit: Improve performance of MemTaskStore queries
Repository: aurora
Updated Branches:
refs/heads/master 787ccfed5 -> 858552db0
Improve performance of MemTaskStore queries
Use `ArrayDeque` rather than `HashSet` for fetchTasks, and use imperative style
rather than functional. I arrived at this result after running benchmarks with
some of the other usual suspects (`ArrayList`, `LinkedList`).
This patch also enables stack and heap profilers in jmh (more details
[here](http://hg.openjdk.java.net/codetools/jmh/file/25d8b2695bac/jmh-samples/src/main/java/org/openjdk/jmh/samples/JMHSample_35_Profilers.java)),
providing insight into the heap impact of changes. I started this change with a
heap profiler as the primary motivation, and ended up using it to guide this
improvement.
Reviewed at https://reviews.apache.org/r/65303/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/858552db
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/858552db
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/858552db
Branch: refs/heads/master
Commit: 858552db0f433eca7ae615a961de82fc56717e5d
Parents: 787ccfe
Author: Bill Farner <wf...@apache.org>
Authored: Wed Jan 31 14:59:30 2018 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Jan 31 15:00:01 2018 -0800
----------------------------------------------------------------------
build.gradle | 1 +
.../aurora/benchmark/TaskStoreBenchmarks.java | 45 ++++++++++--
.../scheduler/storage/mem/MemTaskStore.java | 75 +++++++++-----------
3 files changed, 73 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/858552db/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 64af7ae..57355dc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -602,6 +602,7 @@ jmh {
jvmArgsPrepend = '-Xmx3g'
humanOutputFile = project.file("$jmhHumanOutputPath")
resultsFile = project.file("$buildDir/reports/jmh/results.txt")
+ profilers = ['gc', 'stack']
}
tasks.getByName('jmh').doLast() {
println "Benchmark report generated: file://$jmhHumanOutputPath"
http://git-wip-us.apache.org/repos/asf/aurora/blob/858552db/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java
index 9ec9865..4e4d36b 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java
@@ -16,7 +16,6 @@ package org.apache.aurora.benchmark;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import com.google.common.collect.Iterables;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.util.Modules;
@@ -27,6 +26,7 @@ import org.apache.aurora.common.util.testing.FakeClock;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.TaskStore;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
@@ -54,6 +54,7 @@ public class TaskStoreBenchmarks {
@State(Scope.Thread)
public abstract static class AbstractFetchTasksBenchmark {
protected Storage storage;
+ protected IJobKey job;
public abstract void setUp();
@Param({"10000", "50000", "100000"})
@@ -63,6 +64,7 @@ public class TaskStoreBenchmarks {
storage.write((Storage.MutateWork.NoResult.Quiet) storeProvider -> {
TaskStore.Mutable taskStore = storeProvider.getUnsafeTaskStore();
Set<IScheduledTask> tasks = new Tasks.Builder().build(size);
+ job = tasks.stream().findFirst().get().getAssignedTask().getTask().getJob();
taskStore.saveTasks(tasks);
});
}
@@ -75,7 +77,7 @@ public class TaskStoreBenchmarks {
}
}
- public static class MemFetchTasksBenchmark extends AbstractFetchTasksBenchmark {
+ public static class FetchAll extends AbstractFetchTasksBenchmark {
@Setup(Level.Trial)
@Override
public void setUp() {
@@ -105,9 +107,42 @@ public class TaskStoreBenchmarks {
@Benchmark
public int run() {
- // Iterate through results in case the result is lazily computed.
- return Iterables.size(
- storage.read(store -> store.getTaskStore().fetchTasks(Query.unscoped())));
+ return storage.read(store -> store.getTaskStore().fetchTasks(Query.unscoped())).size();
+ }
+ }
+
+ public static class IndexedFetchAndFilter extends AbstractFetchTasksBenchmark {
+ @Setup(Level.Trial)
+ @Override
+ public void setUp() {
+ storage = Guice.createInjector(
+ Modules.combine(
+ new MemStorageModule(),
+ new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(StatsProvider.class).toInstance(new FakeStatsProvider());
+ bind(Clock.class).toInstance(new FakeClock());
+ }
+ }))
+ .getInstance(Storage.class);
+
+ }
+
+ @Setup(Level.Iteration)
+ public void setUpIteration() {
+ createTasks(numTasks);
+ }
+
+ @TearDown(Level.Iteration)
+ public void tearDownIteration() {
+ deleteTasks();
+ }
+
+ @Benchmark
+ public int run() {
+ return storage.read(
+ store -> store.getTaskStore().fetchTasks(Query.instanceScoped(job, 0))).size();
}
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/858552db/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
index b59999c..5b10350 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemTaskStore.java
@@ -17,6 +17,9 @@ import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -28,12 +31,9 @@ import javax.inject.Qualifier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
-import com.google.common.base.Functions;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
-import com.google.common.collect.FluentIterable;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@@ -130,11 +130,11 @@ class MemTaskStore implements TaskStore.Mutable {
@Timed("mem_storage_fetch_tasks")
@Override
- public ImmutableSet<IScheduledTask> fetchTasks(Query.Builder query) {
+ public Collection<IScheduledTask> fetchTasks(Query.Builder query) {
requireNonNull(query);
long start = System.nanoTime();
- ImmutableSet<IScheduledTask> result = matches(query).toSet();
+ Collection<IScheduledTask> result = matches(query);
long durationNanos = System.nanoTime() - start;
boolean infoLevel = durationNanos >= slowQueryThresholdNanos;
long time = Amount.of(durationNanos, Time.NANOSECONDS).as(Time.MILLISECONDS);
@@ -164,9 +164,9 @@ class MemTaskStore implements TaskStore.Mutable {
"Proposed new tasks would create task ID collision.");
Iterable<Task> canonicalized = Iterables.transform(newTasks, toTask);
- tasks.putAll(Maps.uniqueIndex(canonicalized, TO_ID));
+ tasks.putAll(Maps.uniqueIndex(canonicalized, task -> Tasks.id(task.storedTask)));
for (SecondaryIndex<?> index : secondaryIndices) {
- index.insert(Iterables.transform(canonicalized, TO_SCHEDULED));
+ index.insert(Iterables.transform(canonicalized, task -> task.storedTask));
}
}
@@ -220,27 +220,22 @@ class MemTaskStore implements TaskStore.Mutable {
});
}
- private static Predicate<Task> queryFilter(Query.Builder query) {
- return Predicates.compose(
- Util.queryFilter(query),
- new Function<Task, IScheduledTask>() {
- @Override
- public IScheduledTask apply(Task canonicalTask) {
- return canonicalTask.storedTask;
- }
- });
- }
+ private Collection<IScheduledTask> fromIdIndex(
+ Iterable<String> taskIds,
+ Predicate<IScheduledTask> filter) {
- private Iterable<Task> fromIdIndex(Iterable<String> taskIds) {
- return FluentIterable.from(taskIds)
- .transform(Functions.forMap(tasks, null))
- .filter(Predicates.notNull())
- .toList();
+ Collection<IScheduledTask> result = new ArrayDeque<>();
+ for (String id : taskIds) {
+ Task match = tasks.get(id);
+ if (match != null && filter.apply(match.storedTask)) {
+ result.add(match.storedTask);
+ }
+ }
+ return result;
}
- private FluentIterable<IScheduledTask> matches(Query.Builder query) {
- // Apply the query against the working set.
- Optional<? extends Iterable<Task>> from = Optional.empty();
+ private Collection<IScheduledTask> matches(Query.Builder query) {
+ Predicate<IScheduledTask> filter = Util.queryFilter(query);
if (query.get().getTaskIds().isEmpty()) {
for (SecondaryIndex<?> index : secondaryIndices) {
Optional<Iterable<String>> indexMatch = index.getMatches(query);
@@ -248,31 +243,25 @@ class MemTaskStore implements TaskStore.Mutable {
// Note: we could leverage multiple indexes here if the query applies to them, by
// choosing to intersect the results. Given current indexes and query profile, this is
// unlikely to offer much improvement, though.
- from = Optional.of(fromIdIndex(indexMatch.get()));
- break;
+ return fromIdIndex(indexMatch.get(), filter);
}
}
// No indices match, fall back to a full scan.
- if (!from.isPresent()) {
- taskQueriesAll.incrementAndGet();
- from = Optional.of(tasks.values());
+ taskQueriesAll.incrementAndGet();
+ Collection<IScheduledTask> result = new ArrayDeque<>();
+ for (Task task : tasks.values()) {
+ if (filter.test(task.storedTask)) {
+ result.add(task.storedTask);
+ }
}
+ return Collections.unmodifiableCollection(result);
} else {
taskQueriesById.incrementAndGet();
- from = Optional.of(fromIdIndex(query.get().getTaskIds()));
+ return fromIdIndex(query.get().getTaskIds(), filter);
}
-
- return FluentIterable.from(from.get())
- .filter(queryFilter(query))
- .transform(TO_SCHEDULED);
}
- private static final Function<Task, IScheduledTask> TO_SCHEDULED = task -> task.storedTask;
-
- private static final Function<Task, String> TO_ID =
- Functions.compose(Tasks::id, TO_SCHEDULED);
-
private static class Task {
private final IScheduledTask storedTask;
@@ -388,13 +377,13 @@ class MemTaskStore implements TaskStore.Mutable {
@Override
public Iterable<String> apply(Set<K> keys) {
hitCount.incrementAndGet();
- ImmutableSet.Builder<String> builder = ImmutableSet.builder();
+ Collection<String> matches = new ArrayDeque<>();
synchronized (index) {
for (K key : keys) {
- builder.addAll(index.get(key));
+ matches.addAll(index.get(key));
}
}
- return builder.build();
+ return matches;
}
};