You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2015/04/28 22:44:12 UTC

aurora git commit: Implementing PendingTaskProcessor benchmark.

Repository: aurora
Updated Branches:
  refs/heads/master 32cd1d56f -> e311dbee7


Implementing PendingTaskProcessor benchmark.

Reviewed at https://reviews.apache.org/r/33458/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/e311dbee
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/e311dbee
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/e311dbee

Branch: refs/heads/master
Commit: e311dbee7f553e77ebdeb086e1711d1df2871bf1
Parents: 32cd1d5
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Tue Apr 28 13:39:29 2015 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Tue Apr 28 13:39:29 2015 -0700

----------------------------------------------------------------------
 .../aurora/benchmark/BenchmarkSettings.java     | 18 +++---
 .../aurora/benchmark/SchedulingBenchmarks.java  | 65 ++++++++------------
 .../java/org/apache/aurora/benchmark/Tasks.java |  2 -
 .../async/preemptor/PendingTaskProcessor.java   |  3 +-
 .../async/preemptor/PreemptorModule.java        |  1 +
 5 files changed, 39 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/e311dbee/src/jmh/java/org/apache/aurora/benchmark/BenchmarkSettings.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/BenchmarkSettings.java b/src/jmh/java/org/apache/aurora/benchmark/BenchmarkSettings.java
index 8f43bd7..94f8b79 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/BenchmarkSettings.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/BenchmarkSettings.java
@@ -27,18 +27,18 @@ final class BenchmarkSettings {
   private final Set<IHostAttributes> hostAttributes;
   private final double clusterUtilization;
   private final boolean allVictimsEligibleForPreemption;
-  private final IScheduledTask task;
+  private final Set<IScheduledTask> tasks;
 
   private BenchmarkSettings(
       double clusterUtilization,
       boolean allVictimsEligibleForPreemption,
       Set<IHostAttributes> hostAttributes,
-      IScheduledTask task) {
+      Set<IScheduledTask> tasks) {
 
     this.clusterUtilization = clusterUtilization;
     this.allVictimsEligibleForPreemption = allVictimsEligibleForPreemption;
     this.hostAttributes = requireNonNull(hostAttributes);
-    this.task = requireNonNull(task);
+    this.tasks = requireNonNull(tasks);
   }
 
   /**
@@ -74,15 +74,15 @@ final class BenchmarkSettings {
    *
    * @return Task to run a benchmark for.
    */
-  IScheduledTask getTask() {
-    return task;
+  Set<IScheduledTask> getTasks() {
+    return tasks;
   }
 
   static class Builder {
     private double clusterUtilization = 0.9;
     private boolean allVictimsEligibleForPreemption;
     private Set<IHostAttributes> hostAttributes;
-    private IScheduledTask task;
+    private Set<IScheduledTask> tasks;
 
     Builder setClusterUtilization(double newClusterUtilization) {
       clusterUtilization = newClusterUtilization;
@@ -99,8 +99,8 @@ final class BenchmarkSettings {
       return this;
     }
 
-    Builder setTask(IScheduledTask newTask) {
-      task = newTask;
+    Builder setTasks(Set<IScheduledTask> newTasks) {
+      tasks = newTasks;
       return this;
     }
 
@@ -109,7 +109,7 @@ final class BenchmarkSettings {
           clusterUtilization,
           allVictimsEligibleForPreemption,
           hostAttributes,
-          task);
+          tasks);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/e311dbee/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index 372addc..d9e5199 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -20,9 +20,6 @@ import java.util.concurrent.TimeUnit;
 
 import javax.inject.Singleton;
 
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
 import com.google.common.eventbus.EventBus;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.AbstractModule;
@@ -47,11 +44,10 @@ import org.apache.aurora.scheduler.async.TaskScheduler;
 import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl.ReservationDuration;
 import org.apache.aurora.scheduler.async.preemptor.BiCache;
 import org.apache.aurora.scheduler.async.preemptor.ClusterStateImpl;
-import org.apache.aurora.scheduler.async.preemptor.Preemptor;
+import org.apache.aurora.scheduler.async.preemptor.PendingTaskProcessor;
 import org.apache.aurora.scheduler.async.preemptor.PreemptorModule;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
 import org.apache.aurora.scheduler.mesos.Driver;
@@ -59,7 +55,6 @@ import org.apache.aurora.scheduler.mesos.ExecutorSettings;
 import org.apache.aurora.scheduler.state.StateModule;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.db.DbUtil;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.openjdk.jmh.annotations.Benchmark;
@@ -69,6 +64,7 @@ import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Measurement;
 import org.openjdk.jmh.annotations.Mode;
 import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
@@ -92,7 +88,7 @@ public class SchedulingBenchmarks {
     private static final Amount<Long, Time> NO_DELAY = Amount.of(0L, Time.MILLISECONDS);
     private static final Amount<Long, Time> DELAY_FOREVER = Amount.of(30L, Time.DAYS);
     protected Storage storage;
-    protected Preemptor preemptor;
+    protected PendingTaskProcessor pendingTaskProcessor;
     protected ScheduledThreadPoolExecutor executor;
     private TaskScheduler taskScheduler;
     private OfferManager offerManager;
@@ -171,7 +167,7 @@ public class SchedulingBenchmarks {
 
       taskScheduler = injector.getInstance(TaskScheduler.class);
       offerManager = injector.getInstance(OfferManager.class);
-      preemptor = injector.getInstance(Preemptor.class);
+      pendingTaskProcessor = injector.getInstance(PendingTaskProcessor.class);
       eventBus.register(injector.getInstance(ClusterStateImpl.class));
 
       settings = getSettings();
@@ -181,7 +177,7 @@ public class SchedulingBenchmarks {
       Offers.addOffers(offerManager, offers);
       fillUpCluster(offers.size());
 
-      saveTasks(ImmutableSet.of(settings.getTask()));
+      saveTasks(settings.getTasks());
     }
 
     @Setup(Level.Iteration)
@@ -238,7 +234,11 @@ public class SchedulingBenchmarks {
      */
     @Benchmark
     public boolean runBenchmark() {
-      return taskScheduler.schedule(settings.getTask().getAssignedTask().getTaskId());
+      boolean result = false;
+      for (IScheduledTask task : settings.getTasks()) {
+        result = taskScheduler.schedule(task.getAssignedTask().getTaskId());
+      }
+      return result;
     }
   }
 
@@ -250,10 +250,10 @@ public class SchedulingBenchmarks {
     protected BenchmarkSettings getSettings() {
       return new BenchmarkSettings.Builder()
           .setHostAttributes(new Hosts.Builder().setNumHostsPerRack(2).build(1000))
-          .setTask(Iterables.getOnlyElement(new Tasks.Builder()
+          .setTasks(new Tasks.Builder()
               .setProduction(true)
               .setCpu(32)
-              .build(1))).build();
+              .build(1)).build();
     }
   }
 
@@ -265,10 +265,10 @@ public class SchedulingBenchmarks {
     protected BenchmarkSettings getSettings() {
       return new BenchmarkSettings.Builder()
           .setHostAttributes(new Hosts.Builder().setNumHostsPerRack(2).build(1000))
-          .setTask(Iterables.getOnlyElement(new Tasks.Builder()
+          .setTasks(new Tasks.Builder()
               .setProduction(true)
               .addValueConstraint("host", "denied")
-              .build(1))).build();
+              .build(1)).build();
     }
   }
 
@@ -280,10 +280,10 @@ public class SchedulingBenchmarks {
     protected BenchmarkSettings getSettings() {
       return new BenchmarkSettings.Builder()
           .setHostAttributes(new Hosts.Builder().setNumHostsPerRack(2).build(1000))
-          .setTask(Iterables.getOnlyElement(new Tasks.Builder()
+          .setTasks(new Tasks.Builder()
               .setProduction(true)
               .addLimitConstraint("host", 0)
-              .build(1))).build();
+              .build(1)).build();
     }
   }
 
@@ -298,10 +298,10 @@ public class SchedulingBenchmarks {
           .setClusterUtilization(1.0)
           .setVictimPreemptionEligibilty(true)
           .setHostAttributes(new Hosts.Builder().setNumHostsPerRack(2).build(10000))
-          .setTask(Iterables.getOnlyElement(new Tasks.Builder()
+          .setTasks(new Tasks.Builder()
               .setProduction(true)
               .addLimitConstraint("host", 0)
-              .build(1))).build();
+              .build(1)).build();
     }
   }
 
@@ -309,36 +309,25 @@ public class SchedulingBenchmarks {
    * Tests preemptor searching for a preemption slot in a completely filled up cluster.
    */
   public static class PreemptorSlotSearchBenchmark extends AbstractBase {
+    @Param({"1", "10", "100", "1000"})
+    public int numPendingTasks;
+
     @Override
     protected BenchmarkSettings getSettings() {
       return new BenchmarkSettings.Builder()
           .setClusterUtilization(1.0)
-          .setHostAttributes(new Hosts.Builder().setNumHostsPerRack(2).build(1000))
-          .setTask(Iterables.getOnlyElement(new Tasks.Builder()
+          .setHostAttributes(new Hosts.Builder().setNumHostsPerRack(2).build(10000))
+          .setTasks(new Tasks.Builder()
               .setProduction(true)
               .addValueConstraint("host", "denied")
-              .build(1))).build();
+              .build(numPendingTasks)).build();
     }
 
     @Override
     public boolean runBenchmark() {
-      return storage.write(new Storage.MutateWork.Quiet<Boolean>() {
-        @Override
-        public Boolean apply(final Storage.MutableStoreProvider storeProvider) {
-          IAssignedTask assignedTask = getSettings().getTask().getAssignedTask();
-          AttributeAggregate aggregate =
-              AttributeAggregate.getJobActiveState(storeProvider, assignedTask.getTask().getJob());
-          Optional<String> result =
-              preemptor.attemptPreemptionFor(assignedTask, aggregate, storeProvider);
-
-          while (executor.getActiveCount() > 0) {
-            // Using a tight loop to wait for a search completion. This is executed on a benchmark
-            // main thread and does not affect test results.
-          }
-
-          return result.isPresent();
-        }
-      });
+      pendingTaskProcessor.run();
+      // Return non-guessable result to satisfy "blackhole" requirement.
+      return System.currentTimeMillis() % 5 == 0;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/e311dbee/src/jmh/java/org/apache/aurora/benchmark/Tasks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/Tasks.java b/src/jmh/java/org/apache/aurora/benchmark/Tasks.java
index b4da057..d3e1295 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/Tasks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/Tasks.java
@@ -45,8 +45,6 @@ final class Tasks {
    * Builds tasks for the specified configuration.
    */
   static final class Builder {
-    private static final String USER_FORMAT = "user-%s";
-
     private JobKey jobKey = new JobKey("jmh", "dev", "benchmark");
     private int uuidStart = 0;
     private boolean isProduction = false;

http://git-wip-us.apache.org/repos/asf/aurora/blob/e311dbee/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java
index 4427115..c1114a4 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java
@@ -72,7 +72,8 @@ import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
 /**
  * Attempts to find preemption slots for all PENDING tasks eligible for preemption.
  */
-class PendingTaskProcessor implements Runnable {
+@VisibleForTesting
+public class PendingTaskProcessor implements Runnable {
   private final Storage storage;
   private final OfferManager offerManager;
   private final PreemptionVictimFilter preemptionVictimFilter;

http://git-wip-us.apache.org/repos/asf/aurora/blob/e311dbee/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
index 156bac2..3d9e27b 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java
@@ -114,6 +114,7 @@ public class PreemptorModule extends AbstractModule {
                   slotSearchInterval.getUnit().getTimeUnit()));
 
           expose(PreemptorService.class);
+          expose(PendingTaskProcessor.class);
         } else {
           bind(Preemptor.class).toInstance(NULL_PREEMPTOR);
           LOG.warning("Preemptor Disabled.");