You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/04/24 01:57:17 UTC

git commit: Implementing Scheduler SLA metrics.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 4e42252be -> 53dc494ff


Implementing Scheduler SLA metrics.

High level overview:
- MetricCalculator runs periodically (every minute), pulls all task history,
  packages it into SlaInstance list and updates stats;
- Stat calculation is handled by a pair of: SlaAlgorithm and a set of
  applicable SlaGroups (logical groupings by job, cluster, resource and etc.);
- Stat name is generated by combining group and algorithm name parts.

Bugs closed: AURORA-293

Reviewed at https://reviews.apache.org/r/20398/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/53dc494f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/53dc494f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/53dc494f

Branch: refs/heads/master
Commit: 53dc494ff2dba934468e32524824aef13f2efc73
Parents: 4e42252
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Wed Apr 23 16:55:07 2014 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Wed Apr 23 16:55:07 2014 -0700

----------------------------------------------------------------------
 .../apache/aurora/scheduler/app/AppModule.java  |   2 +
 .../scheduler/async/RescheduleCalculator.java   |  16 +-
 .../scheduler/base/ResourceAggregates.java      |  79 ++++
 .../org/apache/aurora/scheduler/base/Tasks.java |   8 +
 .../aurora/scheduler/quota/QuotaManager.java    |   1 +
 .../scheduler/quota/ResourceAggregates.java     |  66 ---
 .../aurora/scheduler/sla/MetricCalculator.java  | 170 ++++++++
 .../aurora/scheduler/sla/SlaAlgorithm.java      | 437 +++++++++++++++++++
 .../apache/aurora/scheduler/sla/SlaGroup.java   | 174 ++++++++
 .../apache/aurora/scheduler/sla/SlaModule.java  | 116 +++++
 .../apache/aurora/scheduler/sla/SlaUtil.java    |  50 +++
 .../aurora/scheduler/stats/SlotSizeCounter.java |  11 +-
 .../scheduler/quota/QuotaManagerImplTest.java   |   1 +
 .../scheduler/sla/MetricCalculatorTest.java     |  66 +++
 .../aurora/scheduler/sla/SlaAlgorithmTest.java  | 291 ++++++++++++
 .../aurora/scheduler/sla/SlaTestUtil.java       |  65 +++
 .../scheduler/stats/SlotSizeCounterTest.java    |   2 +-
 .../storage/log/SnapshotStoreImplTest.java      |   2 +-
 18 files changed, 1471 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
index 2bedf9b..3dc15ad 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
@@ -53,6 +53,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
 import org.apache.aurora.scheduler.http.ServletModule;
 import org.apache.aurora.scheduler.metadata.MetadataModule;
 import org.apache.aurora.scheduler.quota.QuotaModule;
+import org.apache.aurora.scheduler.sla.SlaModule;
 import org.apache.aurora.scheduler.state.StateModule;
 import org.apache.aurora.scheduler.stats.AsyncStatsModule;
 import org.apache.aurora.scheduler.storage.entities.IServerInfo;
@@ -116,6 +117,7 @@ class AppModule extends AbstractModule {
     install(new ServletModule());
     install(new SchedulerModule());
     install(new StateModule());
+    install(new SlaModule());
 
     bind(StatsProvider.class).toInstance(Stats.STATS_PROVIDER);
   }

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
index bc77bf2..4f79f32 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
@@ -22,7 +22,6 @@ import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
-import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
@@ -82,14 +81,6 @@ public interface RescheduleCalculator {
     private static final Predicate<ScheduleStatus> IS_ACTIVE_STATUS =
         Predicates.in(Tasks.ACTIVE_STATES);
 
-    private static final Function<ITaskEvent, ScheduleStatus> TO_STATUS =
-        new Function<ITaskEvent, ScheduleStatus>() {
-          @Override
-          public ScheduleStatus apply(ITaskEvent input) {
-            return input.getStatus();
-          }
-        };
-
     private static final Set<ScheduleStatus> INTERRUPTED_TASK_STATES =
         EnumSet.of(RESTARTING, KILLING, DRAINING);
 
@@ -104,7 +95,7 @@ public interface RescheduleCalculator {
 
         // Avoid penalizing tasks that were interrupted by outside action, such as a user
         // restarting them.
-        if (Iterables.any(Iterables.transform(events, TO_STATUS),
+        if (Iterables.any(Iterables.transform(events, Tasks.TASK_EVENT_TO_STATUS),
             Predicates.in(INTERRUPTED_TASK_STATES))) {
           return false;
         }
@@ -113,8 +104,9 @@ public interface RescheduleCalculator {
         ScheduleStatus terminalState = terminalEvent.getStatus();
         Preconditions.checkState(Tasks.isTerminated(terminalState));
 
-        ITaskEvent activeEvent =
-            Iterables.find(events, Predicates.compose(IS_ACTIVE_STATUS, TO_STATUS));
+        ITaskEvent activeEvent = Iterables.find(
+            events,
+            Predicates.compose(IS_ACTIVE_STATUS, Tasks.TASK_EVENT_TO_STATUS));
 
         long thresholdMs = settings.flappingTaskThreashold.as(Time.MILLISECONDS);
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/base/ResourceAggregates.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/ResourceAggregates.java b/src/main/java/org/apache/aurora/scheduler/base/ResourceAggregates.java
new file mode 100644
index 0000000..7804300
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/base/ResourceAggregates.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright 2013 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.base;
+
+import com.google.common.collect.Ordering;
+
+import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+
+/**
+ * Convenience class for normalizing resource measures between tasks and offers.
+ */
+public final class ResourceAggregates {
+
+  public static final IResourceAggregate EMPTY =
+      IResourceAggregate.build(new ResourceAggregate(0, 0, 0));
+
+  public static final IResourceAggregate SMALL =
+      IResourceAggregate.build(new ResourceAggregate(1.0, 1024, 4096));
+
+  public static final IResourceAggregate MEDIUM =
+      IResourceAggregate.build(new ResourceAggregate(4.0, 8192, 16384));
+
+  public static final IResourceAggregate LARGE =
+      IResourceAggregate.build(new ResourceAggregate(8.0, 16384, 32768));
+
+  public static final IResourceAggregate XLARGE =
+      IResourceAggregate.build(new ResourceAggregate(16.0, 32768, 65536));
+
+  private ResourceAggregates() {
+    // Utility class.
+  }
+
+  /**
+   * Returns a quota with all resource vectors zeroed.
+   *
+   * @return A resource aggregate with all resource vectors zeroed.
+   */
+  public static IResourceAggregate none() {
+    return EMPTY;
+  }
+
+  /**
+   * a * m
+   */
+  public static IResourceAggregate scale(IResourceAggregate a, int m) {
+    return IResourceAggregate.build(new ResourceAggregate()
+        .setNumCpus(a.getNumCpus() * m)
+        .setRamMb(a.getRamMb() * m)
+        .setDiskMb(a.getDiskMb() * m));
+  }
+
+  /**
+   * a / b
+   * <p>
+   * This calculates how many times {@code b} "fits into" {@code a}.  Behavior is undefined when
+   * {@code b} contains resources with a value of zero.
+   */
+  public static int divide(IResourceAggregate a, IResourceAggregate b) {
+    return Ordering.natural().min(
+        a.getNumCpus() / b.getNumCpus(),
+        (double) a.getRamMb() / b.getRamMb(),
+        (double) a.getDiskMb() / b.getDiskMb()
+    ).intValue();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
index fae2d23..e5cc469 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
@@ -119,6 +119,14 @@ public final class Tasks {
   public static final Function<IScheduledTask, String> SCHEDULED_TO_SLAVE_HOST =
       Functions.compose(ASSIGNED_TO_SLAVE_HOST, SCHEDULED_TO_ASSIGNED);
 
+  public static final Function<ITaskEvent, ScheduleStatus> TASK_EVENT_TO_STATUS =
+      new Function<ITaskEvent, ScheduleStatus>() {
+        @Override
+        public ScheduleStatus apply(ITaskEvent input) {
+          return input.getStatus();
+        }
+      };
+
   /**
    * Different states that an active task may be in.
    */

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
index 56f47b6..ea86d52 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
@@ -23,6 +23,7 @@ import com.google.inject.Inject;
 import org.apache.aurora.gen.ResourceAggregate;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.ResourceAggregates;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.StoreProvider;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/quota/ResourceAggregates.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/ResourceAggregates.java b/src/main/java/org/apache/aurora/scheduler/quota/ResourceAggregates.java
deleted file mode 100644
index 444c287..0000000
--- a/src/main/java/org/apache/aurora/scheduler/quota/ResourceAggregates.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.quota;
-
-import com.google.common.collect.Ordering;
-
-import org.apache.aurora.gen.ResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-
-/**
- * Convenience class for normalizing resource measures between tasks and offers.
- */
-public final class ResourceAggregates {
-  private static final IResourceAggregate EMPTY_RESOURCE_AGGREGATE =
-      IResourceAggregate.build(new ResourceAggregate(0, 0, 0));
-
-  private ResourceAggregates() {
-    // Utility class.
-  }
-
-  /**
-   * Returns a quota with all resource vectors zeroed.
-   *
-   * @return A resource aggregate with all resource vectors zeroed.
-   */
-  public static IResourceAggregate none() {
-    return EMPTY_RESOURCE_AGGREGATE;
-  }
-
-  /**
-   * a * m
-   */
-  public static IResourceAggregate scale(IResourceAggregate a, int m) {
-    return IResourceAggregate.build(new ResourceAggregate()
-        .setNumCpus(a.getNumCpus() * m)
-        .setRamMb(a.getRamMb() * m)
-        .setDiskMb(a.getDiskMb() * m));
-  }
-
-  /**
-   * a / b
-   * <p>
-   * This calculates how many times {@code b} "fits into" {@code a}.  Behavior is undefined when
-   * {@code b} contains resources with a value of zero.
-   */
-  public static int divide(IResourceAggregate a, IResourceAggregate b) {
-    return Ordering.natural().min(
-        a.getNumCpus() / b.getNumCpus(),
-        (double) a.getRamMb() / b.getRamMb(),
-        (double) a.getDiskMb() / b.getDiskMb()
-    ).intValue();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java b/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
new file mode 100644
index 0000000..bc90030
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
@@ -0,0 +1,170 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.sla;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.base.Supplier;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Range;
+
+import com.twitter.common.inject.TimedInterceptor.Timed;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType;
+import org.apache.aurora.scheduler.sla.SlaGroup.GroupType;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.AGGREGATE_PLATFORM_UPTIME;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.JOB_UPTIME_50;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.JOB_UPTIME_75;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.JOB_UPTIME_90;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.JOB_UPTIME_95;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.JOB_UPTIME_99;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.MEDIAN_TIME_TO_ASSIGNED;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.MEDIAN_TIME_TO_RUNNING;
+import static org.apache.aurora.scheduler.sla.SlaGroup.GroupType.CLUSTER;
+import static org.apache.aurora.scheduler.sla.SlaGroup.GroupType.JOB;
+import static org.apache.aurora.scheduler.sla.SlaGroup.GroupType.RESOURCE_CPU;
+import static org.apache.aurora.scheduler.sla.SlaGroup.GroupType.RESOURCE_DISK;
+import static org.apache.aurora.scheduler.sla.SlaGroup.GroupType.RESOURCE_RAM;
+
+/**
+ * Responsible for calculating and exporting SLA metrics.
+ */
+class MetricCalculator implements Runnable {
+
+  private static final Multimap<AlgorithmType, GroupType> METRICS =
+      ImmutableMultimap.<AlgorithmType, GroupType>builder()
+          .put(JOB_UPTIME_50, JOB)
+          .put(JOB_UPTIME_75, JOB)
+          .put(JOB_UPTIME_90, JOB)
+          .put(JOB_UPTIME_95, JOB)
+          .put(JOB_UPTIME_99, JOB)
+          .putAll(AGGREGATE_PLATFORM_UPTIME, JOB, CLUSTER)
+          .putAll(MEDIAN_TIME_TO_ASSIGNED, JOB, CLUSTER, RESOURCE_CPU, RESOURCE_RAM, RESOURCE_DISK)
+          .putAll(MEDIAN_TIME_TO_RUNNING, JOB, CLUSTER, RESOURCE_CPU, RESOURCE_RAM, RESOURCE_DISK)
+          .build();
+
+  private static final Predicate<ITaskConfig> IS_SERVICE =
+      new Predicate<ITaskConfig>() {
+        @Override
+        public boolean apply(ITaskConfig task) {
+          return task.isIsService();
+        }
+      };
+
+  private final LoadingCache<String, Counter> metricCache;
+  private final Storage storage;
+  private final Clock clock;
+  private final MetricCalculatorSettings settings;
+
+  static class MetricCalculatorSettings {
+    private final long refreshRateMs;
+
+    MetricCalculatorSettings(long refreshRateMs) {
+      this.refreshRateMs = refreshRateMs;
+    }
+  }
+
+  private static class Counter implements Supplier<Number> {
+    private final AtomicReference<Number> value = new AtomicReference<>((Number) 0);
+    private final StatsProvider statsProvider;
+    private boolean exported;
+
+    Counter(StatsProvider statsProvider) {
+      this.statsProvider = statsProvider;
+    }
+
+    @Override
+    public Number get() {
+      return value.get();
+    }
+
+    private void set(String name, Number newValue) {
+      if (!exported) {
+        statsProvider.makeGauge(name, this);
+        exported = true;
+      }
+      value.set(newValue);
+    }
+  }
+
+  @Inject
+  MetricCalculator(
+      Storage storage,
+      Clock clock,
+      MetricCalculatorSettings settings,
+      final StatsProvider statsProvider) {
+
+    this.storage = checkNotNull(storage);
+    this.clock = checkNotNull(clock);
+    this.settings = checkNotNull(settings);
+
+    checkNotNull(statsProvider);
+    this.metricCache = CacheBuilder.newBuilder().build(
+        new CacheLoader<String, Counter>() {
+          public Counter load(String key) {
+            return new Counter(statsProvider);
+          }
+        });
+  }
+
+  @Timed("sla_stats_computation")
+  @Override
+  public void run() {
+    List<IScheduledTask> tasks =
+        FluentIterable.from(Storage.Util.weaklyConsistentFetchTasks(storage, Query.unscoped()))
+            .filter(Predicates.compose(
+                Predicates.and(Tasks.IS_PRODUCTION, IS_SERVICE),
+                Tasks.SCHEDULED_TO_INFO)).toList();
+
+    long nowMs = clock.nowMillis();
+    long intervalStartMs = nowMs - settings.refreshRateMs;
+
+    for (Entry<AlgorithmType, GroupType> slaMetric : METRICS.entries()) {
+      for (Entry<String, Collection<IScheduledTask>> namedGroup
+          : slaMetric.getValue().getSlaGroup().createNamedGroups(tasks).asMap().entrySet()) {
+
+        AlgorithmType algoType = slaMetric.getKey();
+        String metricName = namedGroup.getKey() + algoType.getAlgorithmName();
+        metricCache.getUnchecked(metricName)
+            .set(metricName, algoType.getAlgorithm().calculate(
+                namedGroup.getValue(),
+                Range.closedOpen(intervalStartMs, nowMs)));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java
new file mode 100644
index 0000000..81efb2d
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java
@@ -0,0 +1,437 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.sla;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Objects;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Range;
+import com.twitter.common.collections.Pair;
+
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Defines an SLA algorithm to be applied to a {@link IScheduledTask}
+ * set for calculating a specific SLA metric.
+ */
+interface SlaAlgorithm {
+
+  /**
+   * Applies this algorithm to a set of {@link IScheduledTask} to
+   * produce a named metric value over the specified time frame.
+   *
+   * @param tasks Set of tasks to apply this algorithm to.
+   * @param timeFrame Relevant time frame.
+   * @return Produced metric value.
+   */
+  Number calculate(Iterable<IScheduledTask> tasks, Range<Long> timeFrame);
+
+  /**
+   * Pre-configured SLA algorithms.
+   */
+  enum AlgorithmType {
+
+    JOB_UPTIME_99(new JobUptime(99f), String.format(JobUptime.NAME_FORMAT, 99f)),
+    JOB_UPTIME_95(new JobUptime(95f), String.format(JobUptime.NAME_FORMAT, 95f)),
+    JOB_UPTIME_90(new JobUptime(90f), String.format(JobUptime.NAME_FORMAT, 90f)),
+    JOB_UPTIME_75(new JobUptime(75f), String.format(JobUptime.NAME_FORMAT, 75f)),
+    JOB_UPTIME_50(new JobUptime(50f), String.format(JobUptime.NAME_FORMAT, 50f)),
+    AGGREGATE_PLATFORM_UPTIME(new AggregatePlatformUptime(), "platform_uptime_percent"),
+    MEDIAN_TIME_TO_ASSIGNED(new MedianAlgorithm(ScheduleStatus.ASSIGNED), "mtta_ms"),
+    MEDIAN_TIME_TO_RUNNING(new MedianAlgorithm(ScheduleStatus.RUNNING), "mttr_ms");
+
+    private final SlaAlgorithm algorithm;
+    private final String name;
+
+    AlgorithmType(SlaAlgorithm algorithm, String name) {
+      this.algorithm = algorithm;
+      this.name = name;
+    }
+
+    SlaAlgorithm getAlgorithm() {
+      return algorithm;
+    }
+
+    String getAlgorithmName() {
+      return name;
+    }
+  }
+
+  /**
+   * Median time to status SLA algorithm.
+   * Represents the median time spent waiting for a set of tasks to reach specified status.
+   * A combined metric that helps tracking the task scheduling performance dependency on the
+   * requested resources (user scope) as well as the internal scheduler bin-packing algorithm
+   * efficiency (platform scope).
+   * <p/>
+   * Median time calculated as:
+   * <pre>
+   *    MT =  MEDIAN(Wait_times)
+   * where:
+   *    Wait_times - a collection of time intervals between PENDING and specified task state.
+   *</pre>
+   */
+  final class MedianAlgorithm implements SlaAlgorithm {
+
+    private final ScheduleStatus status;
+
+    private MedianAlgorithm(ScheduleStatus status) {
+      this.status = status;
+    }
+
+    @Override
+    public Number calculate(Iterable<IScheduledTask> tasks, Range<Long> timeFrame) {
+      Iterable<IScheduledTask> activeTasks = FluentIterable.from(tasks)
+          .filter(Predicates.compose(Predicates.in(Tasks.ACTIVE_STATES), Tasks.GET_STATUS));
+
+      List<Long> waitTimes = Lists.newLinkedList();
+      for (IScheduledTask task : activeTasks) {
+        long pendingTs = 0;
+        for (ITaskEvent event : task.getTaskEvents()) {
+          if (event.getStatus() == ScheduleStatus.PENDING) {
+            pendingTs = event.getTimestamp();
+          } else if (event.getStatus() == status) {
+
+            if (pendingTs == 0) {
+              throw new IllegalArgumentException("SLA: missing PENDING status for:"
+                  + task.getAssignedTask().getTaskId());
+            }
+
+            waitTimes.add(event.getTimestamp() - pendingTs);
+            break;
+          }
+        }
+      }
+
+     return SlaUtil.percentile(waitTimes, 50.0);
+    }
+  }
+
+  /**
+   * Job uptime SLA algorithm.
+   * Represents the percentage of instances considered to be in running state for
+   * the specified duration relative to SLA calculation time.
+   */
+  final class JobUptime implements SlaAlgorithm {
+
+    private static final String NAME_FORMAT = "job_uptime_%.2f_sec";
+    private final float percentile;
+
+    private static final Predicate<IScheduledTask> IS_RUNNING =
+        Predicates.compose(
+            Predicates.in(ImmutableSet.of(ScheduleStatus.RUNNING)),
+            Tasks.GET_STATUS);
+
+    private static final Function<IScheduledTask, ITaskEvent> TASK_TO_EVENT =
+        new Function<IScheduledTask, ITaskEvent>() {
+          @Override
+          public ITaskEvent apply(IScheduledTask task) {
+            return Tasks.getLatestEvent(task);
+          }
+        };
+
+    private JobUptime(float percentile) {
+      this.percentile = percentile;
+    }
+
+    @Override
+    public Number calculate(Iterable<IScheduledTask> tasks, final Range<Long> timeFrame) {
+      List<Long> uptimes = FluentIterable.from(tasks)
+          .filter(IS_RUNNING)
+          .transform(Functions.compose(
+              new Function<ITaskEvent, Long>() {
+                @Override
+                public Long apply(ITaskEvent event) {
+                  return timeFrame.upperEndpoint() - event.getTimestamp();
+                }
+              },
+              TASK_TO_EVENT)).toList();
+
+      return (int) Math.floor((double) SlaUtil.percentile(uptimes, percentile) / 1000);
+    }
+  }
+
+  /**
+   * Aggregate Platform Uptime SLA algorithm.
+   * Aggregate amount of runnable time a platform managed to deliver for a set of tasks from the
+   * moment of reaching them RUNNING status. Excludes any time a task is not in a runnable state
+   * due to user activities (e.g. newly created waiting for host assignment or restarted/killed
+   * by the user).
+   * <p/>
+   * Aggregate platform uptime calculated as:
+   * <pre>
+   *    APU = SUM(Up_time) / SUM(SI - Removed_time)
+   * where:
+   *    Up_time - the aggregate instance UP time over the sampling interval (SI);
+   *    SI - sampling interval (e.g. 1 minute);
+   *    Removed_time - the aggregate instance REMOVED time over the sampling interval.
+   * </pre>
+   */
+  final class AggregatePlatformUptime implements SlaAlgorithm {
+
+    /**
+     * Task platform SLA state.
+     */
+    enum SlaState {
+      /**
+       * Starts a period when the task is not expected to be UP due to user initiated action
+       * or failure.
+       * <p/>
+       * This period is ignored for the calculation purposes.
+       */
+      REMOVED,
+
+      /**
+       * Starts a period when the task cannot reach the UP state for some non-user-related reason.
+       * <p/>
+       * Only platform-incurred task state transitions are considered here. If a task is newly
+       * created (e.g. by job create/update) the amount of time a task spends to reach its UP
+       * state is not counted towards platform downtime. For example, a newly added PENDING task
+       * is considered as REMOVED, whereas a PENDING task rescheduled from LOST will be considered
+       * as DOWN. This approach ensures this metric is not sensitive to user-initiated activities
+       * and is a true reflection of the system recovery performance.
+       */
+      DOWN,
+
+      /**
+       * Starts a period when the task is considered to be up and running from the Aurora
+       * platform standpoint.
+       * <p/>
+       * Note: The platform uptime does not necessarily equate to the real application
+       * availability. This is because a hosted application needs time to deploy, initialize,
+       * and start executing.
+       */
+      UP
+    }
+
+    private static class Interval {
+      private final SlaState state;
+      private final Range<Long> range;
+
+      Interval(SlaState state, long start, long end) {
+        this.state = state;
+        range = Range.closedOpen(start, end);
+      }
+    }
+
+    private static class InstanceId {
+      private final IJobKey jobKey;
+      private final int id;
+
+      InstanceId(IJobKey jobKey, int instanceId) {
+        this.jobKey = checkNotNull(jobKey);
+        this.id = instanceId;
+      }
+
+      @Override
+      public boolean equals(Object o) {
+        if (!(o instanceof InstanceId)) {
+          return false;
+        }
+
+        InstanceId other = (InstanceId) o;
+        return Objects.equal(jobKey, other.jobKey) && Objects.equal(id, other.id);
+      }
+
+      @Override
+      public int hashCode() {
+        return Objects.hashCode(jobKey, id);
+      }
+    }
+
+    private static final Function<IScheduledTask, InstanceId> TO_ID =
+        new Function<IScheduledTask, InstanceId>() {
+          @Override
+          public InstanceId apply(IScheduledTask task) {
+            return new InstanceId(
+                JobKeys.from(task.getAssignedTask().getTask()),
+                task.getAssignedTask().getInstanceId());
+          }
+        };
+
+    private static final Function<ITaskEvent, Long> TASK_EVENT_TO_TIMESTAMP =
+        new Function<ITaskEvent, Long>() {
+          @Override
+          public Long apply(ITaskEvent taskEvent) {
+            return taskEvent.getTimestamp();
+          }
+        };
+
+    /**
+     * Combine all task events per given instance into the unified sorted instance history view.
+     */
+    private static final Function<Collection<IScheduledTask>, List<ITaskEvent>>
+        TASKS_TO_SORTED_TASK_EVENTS = new Function<Collection<IScheduledTask>, List<ITaskEvent>>() {
+      @Override
+      public List<ITaskEvent> apply(Collection<IScheduledTask> tasks) {
+        List<ITaskEvent> result = Lists.newLinkedList();
+        for (IScheduledTask task : tasks) {
+          result.addAll(task.getTaskEvents());
+        }
+
+        return Ordering.natural()
+            .onResultOf(TASK_EVENT_TO_TIMESTAMP).immutableSortedCopy(result);
+      }
+    };
+
+    /**
+     * Convert instance history into the {@link SlaState} based {@link Interval} list.
+     */
+    private static final Function<List<ITaskEvent>, List<Interval>> TASK_EVENTS_TO_INTERVALS =
+        new Function<List<ITaskEvent>, List<Interval>>() {
+          @Override
+          public List<Interval> apply(List<ITaskEvent> events) {
+
+            ImmutableList.Builder<Interval> intervals = ImmutableList.builder();
+            Pair<SlaState, Long> current = Pair.of(SlaState.REMOVED, 0L);
+
+            for (ITaskEvent event : events) {
+              long timestamp = event.getTimestamp();
+
+              // Event status in the instance timeline signifies either of the following:
+              // - termination of the existing SlaState interval AND start of a new one;
+              // - continuation of the existing matching SlaState interval.
+              switch (event.getStatus()) {
+                case LOST:
+                case DRAINING:
+                case PREEMPTING:
+                  current = updateIntervals(timestamp, SlaState.DOWN, current, intervals);
+                  break;
+
+                case PENDING:
+                case ASSIGNED:
+                case STARTING:
+                  if (current.getFirst() != SlaState.DOWN) {
+                    current = updateIntervals(timestamp, SlaState.REMOVED, current, intervals);
+                  }
+                  break;
+
+                case THROTTLED:
+                case FINISHED:
+                case RESTARTING:
+                case FAILED:
+                case KILLING:
+                  current = updateIntervals(timestamp, SlaState.REMOVED, current, intervals);
+                  break;
+
+                case RUNNING:
+                  current = updateIntervals(timestamp, SlaState.UP, current, intervals);
+                  break;
+
+                case KILLED:
+                  if (current.getFirst() == SlaState.UP) {
+                    current = updateIntervals(timestamp, SlaState.DOWN, current, intervals);
+                  }
+                  break;
+
+                case INIT:
+                case SANDBOX_DELETED:
+                  // Ignore.
+                  break;
+
+                default:
+                  throw new IllegalArgumentException("Unsupported status:" + event.getStatus());
+              }
+            }
+            // Add the last event interval.
+            intervals.add(new Interval(current.getFirst(), current.getSecond(), Long.MAX_VALUE));
+            return intervals.build();
+          }
+        };
+
+    private static Pair<SlaState, Long> updateIntervals(
+        long timestamp,
+        SlaState state,
+        Pair<SlaState, Long> current,
+        ImmutableList.Builder<Interval> intervals) {
+
+      if (current.getFirst() == state) {
+        // Current interval state matches the event state - skip.
+        return current;
+      } else {
+        // Terminate current interval, add it to list and start a new interval.
+        intervals.add(new Interval(current.getFirst(), current.getSecond(), timestamp));
+        return Pair.of(state, timestamp);
+      }
+    }
+
+    private AggregatePlatformUptime() {
+      // Interface private.
+    }
+
+    @Override
+    public Number calculate(Iterable<IScheduledTask> tasks, Range<Long> timeFrame) {
+      // Given the set of tasks do the following:
+      // - index all available tasks by InstanceId (JobKey + instance ID);
+      // - combine individual task ITaskEvent lists into the instance based timeline to represent
+      //   all available history for a given task instance;
+      // - convert instance timeline into the SlaState intervals.
+      Map<InstanceId, List<Interval>> instanceSlaTimeline =
+          Maps.transformValues(
+              Multimaps.index(tasks, TO_ID).asMap(),
+              Functions.compose(TASK_EVENTS_TO_INTERVALS, TASKS_TO_SORTED_TASK_EVENTS));
+
+      // Given the instance timeline converted to SlaState-based time intervals, aggregate the
+      // platform uptime per given timeFrame.
+      long aggregateUptime = 0;
+      long aggregateTotal = 0;
+      for (List<Interval> intervals : instanceSlaTimeline.values()) {
+        long instanceUptime = elapsedFromRange(timeFrame);
+        long instanceTotal = instanceUptime;
+        for (Interval interval : intervals) {
+          if (timeFrame.isConnected(interval.range)) {
+            long intersection = elapsedFromRange(timeFrame.intersection(interval.range));
+            if (interval.state == SlaState.REMOVED) {
+              instanceUptime -= intersection;
+              instanceTotal -= intersection;
+            } else if (interval.state == SlaState.DOWN) {
+              instanceUptime -= intersection;
+            }
+          }
+        }
+        aggregateUptime += instanceUptime;
+        aggregateTotal += instanceTotal;
+      }
+
+      // Calculate effective platform uptime or default to 100.0 if no instances are running yet.
+      return aggregateTotal > 0 ? (double) aggregateUptime * 100 / aggregateTotal : 100.0;
+    }
+
+    private static long elapsedFromRange(Range<Long> range) {
+      return range.upperEndpoint() - range.lowerEndpoint();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
new file mode 100644
index 0000000..46be612
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
@@ -0,0 +1,174 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.sla;
+
+import java.util.Map;
+
+import com.google.common.base.Functions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Range;
+import com.twitter.common.base.Function;
+
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+
+import static org.apache.aurora.scheduler.base.ResourceAggregates.EMPTY;
+import static org.apache.aurora.scheduler.base.ResourceAggregates.LARGE;
+import static org.apache.aurora.scheduler.base.ResourceAggregates.MEDIUM;
+import static org.apache.aurora.scheduler.base.ResourceAggregates.SMALL;
+import static org.apache.aurora.scheduler.base.ResourceAggregates.XLARGE;
+
+/**
+ * Defines a logical grouping criteria to be applied over a set of tasks.
+ */
+interface SlaGroup {
+
+  /**
+   * Generates named groups based on the set of provided tasks.
+   *
+   * @param tasks Set of tasks to generate named groups for.
+   * @return Multimap of group names and relevant tasks.
+   */
+  Multimap<String, IScheduledTask> createNamedGroups(Iterable<IScheduledTask> tasks);
+
+  /**
+   * Pre-configured SLA groupings.
+   */
+  enum GroupType {
+    JOB(new Job()),
+    CLUSTER(new Cluster()),
+    RESOURCE_CPU(new Resource<>(
+        ImmutableMap.of(
+            "sla_cpu_small_", Range.closed(EMPTY.getNumCpus(), SMALL.getNumCpus()),
+            "sla_cpu_medium_", Range.openClosed(SMALL.getNumCpus(), MEDIUM.getNumCpus()),
+            "sla_cpu_large_", Range.openClosed(MEDIUM.getNumCpus(), LARGE.getNumCpus()),
+            "sla_cpu_xlarge_", Range.openClosed(LARGE.getNumCpus(), XLARGE.getNumCpus()),
+            "sla_cpu_xxlarge_", Range.greaterThan(XLARGE.getNumCpus())),
+        new Function<IScheduledTask, Double>() {
+          @Override
+          public Double apply(IScheduledTask task) {
+            return task.getAssignedTask().getTask().getNumCpus();
+          }
+        }
+    )),
+    RESOURCE_RAM(new Resource<>(
+        ImmutableMap.of(
+            "sla_ram_small_", Range.closed(EMPTY.getRamMb(), SMALL.getRamMb()),
+            "sla_ram_medium_", Range.openClosed(SMALL.getRamMb(), MEDIUM.getRamMb()),
+            "sla_ram_large_", Range.openClosed(MEDIUM.getRamMb(), LARGE.getRamMb()),
+            "sla_ram_xlarge_", Range.openClosed(LARGE.getRamMb(), XLARGE.getRamMb()),
+            "sla_ram_xxlarge_", Range.greaterThan(XLARGE.getRamMb())),
+        new Function<IScheduledTask, Long>() {
+          @Override
+          public Long apply(IScheduledTask task) {
+            return task.getAssignedTask().getTask().getRamMb();
+          }
+        }
+    )),
+    RESOURCE_DISK(new Resource<>(
+        ImmutableMap.of(
+            "sla_disk_small_", Range.closed(EMPTY.getDiskMb(), SMALL.getDiskMb()),
+            "sla_disk_medium_", Range.openClosed(SMALL.getDiskMb(), MEDIUM.getDiskMb()),
+            "sla_disk_large_", Range.openClosed(MEDIUM.getDiskMb(), LARGE.getDiskMb()),
+            "sla_disk_xlarge_", Range.openClosed(LARGE.getDiskMb(), XLARGE.getDiskMb()),
+            "sla_disk_xxlarge_", Range.greaterThan(XLARGE.getDiskMb())),
+        new Function<IScheduledTask, Long>() {
+          @Override
+          public Long apply(IScheduledTask task) {
+            return task.getAssignedTask().getTask().getDiskMb();
+          }
+        }
+    ));
+
+    private SlaGroup group;
+    GroupType(SlaGroup group) {
+      this.group = group;
+    }
+
+    SlaGroup getSlaGroup() {
+      return group;
+    }
+  }
+
+  /**
+   * Groups tasks by job.
+   */
+  class Job implements SlaGroup {
+    @Override
+    public Multimap<String, IScheduledTask> createNamedGroups(Iterable<IScheduledTask> tasks) {
+      return Multimaps.index(tasks, Functions.compose(new Function<IJobKey, String>() {
+        @Override
+        public String apply(IJobKey jobKey) {
+          return "sla_" + JobKeys.canonicalString(jobKey) + "_";
+        }
+      }, Tasks.SCHEDULED_TO_JOB_KEY));
+    }
+  }
+
+  /**
+   * Groups all tasks available in the cluster.
+   */
+  class Cluster implements SlaGroup {
+    @Override
+    public Multimap<String, IScheduledTask> createNamedGroups(Iterable<IScheduledTask> tasks) {
+      return Multimaps.index(tasks, new Function<IScheduledTask, String>() {
+        @Override
+        public String apply(IScheduledTask task) {
+          return "sla_cluster_";
+        }
+      });
+    }
+  }
+
+  /**
+   * Groups all tasks by their specified resource value.
+   *
+   * @param <T> Type of resource to group by.
+   */
+  final class Resource<T extends Number & Comparable<T>> implements SlaGroup {
+
+    private final Map<String, Range<T>> map;
+    private final Function<IScheduledTask, T> function;
+
+    private Resource(Map<String, Range<T>> map, Function<IScheduledTask, T> function) {
+      this.map = map;
+      this.function = function;
+    }
+
+    @Override
+    public Multimap<String, IScheduledTask> createNamedGroups(Iterable<IScheduledTask> tasks) {
+      ImmutableListMultimap.Builder<String, IScheduledTask> result =
+          ImmutableListMultimap.builder();
+
+      for (final Map.Entry<String, Range<T>> entry : map.entrySet()) {
+        result.putAll(entry.getKey(), Iterables.filter(tasks, new Predicate<IScheduledTask>() {
+          @Override
+          public boolean apply(IScheduledTask task) {
+            return entry.getValue().contains(function.apply(task));
+          }
+        }));
+      }
+      return result.build();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
new file mode 100644
index 0000000..0c6a2b8
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
@@ -0,0 +1,116 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.sla;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.AbstractModule;
+import com.google.inject.BindingAnnotation;
+import com.google.inject.Singleton;
+import com.twitter.common.application.modules.LifecycleModule;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.base.Command;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+import org.apache.aurora.scheduler.sla.MetricCalculator.MetricCalculatorSettings;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Binding module for the sla processor.
+ */
+public class SlaModule extends AbstractModule {
+
+  private static final Logger LOG = Logger.getLogger(SlaModule.class.getName());
+
+  @CmdLine(name = "sla_stat_refresh_rate", help = "The SLA stat refresh rate.")
+  private static final Arg<Amount<Long, Time>> SLA_REFRESH_RATE =
+      Arg.create(Amount.of(1L, Time.MINUTES));
+
+  @BindingAnnotation
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  private @interface SlaExecutor { }
+
+  @Override
+  protected void configure() {
+    final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
+        1,
+        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SlaStat-%d").build()) {
+
+      @Override
+      protected void afterExecute(Runnable runnable, @Nullable Throwable throwable) {
+        super.afterExecute(runnable, throwable);
+        if (throwable != null) {
+          LOG.log(Level.SEVERE, throwable.toString(), throwable);
+        } else if (runnable instanceof Future) {
+          try {
+            Future<?> future = (Future<?>) runnable;
+            if (future.isDone()) {
+              future.get();
+            }
+          } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+          } catch (ExecutionException ee) {
+            LOG.log(Level.SEVERE, ee.toString(), ee);
+          }
+        }
+      }
+    };
+
+    bind(MetricCalculatorSettings.class).toInstance(
+        new MetricCalculatorSettings(SLA_REFRESH_RATE.get().as(Time.MILLISECONDS)));
+
+    bind(MetricCalculator.class).in(Singleton.class);
+    bind(ScheduledExecutorService.class).annotatedWith(SlaExecutor.class).toInstance(executor);
+    LifecycleModule.bindStartupAction(binder(), SlaUpdater.class);
+  }
+
+  static class SlaUpdater implements Command {
+    private final ScheduledExecutorService executor;
+    private final MetricCalculator calculator;
+
+    @Inject
+    SlaUpdater(@SlaExecutor ScheduledExecutorService executor, MetricCalculator calculator) {
+      this.executor = checkNotNull(executor);
+      this.calculator = checkNotNull(calculator);
+    }
+
+    @Override
+    public void execute() throws RuntimeException {
+      long interval = SLA_REFRESH_RATE.get().as(Time.SECONDS);
+      executor.scheduleAtFixedRate(calculator, interval, interval, TimeUnit.SECONDS);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/sla/SlaUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaUtil.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaUtil.java
new file mode 100644
index 0000000..cf09582
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaUtil.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.sla;
+
+import java.util.List;
+
+import com.google.common.collect.Ordering;
+
+/**
+ * Utility methods for the SLA calculations.
+ */
+final class SlaUtil {
+
+  private SlaUtil() {
+    // Utility class.
+  }
+
+  /**
+   * Reports the percentile value from the given list ordered in a non-descending order.
+   * Example: [30, 60, 70, 90], the 75 percentile is 30 (i.e. 75% of elements are greater).
+   *
+   * @param list List to calculate percentile for.
+   * @param percentile Percentile value to apply.
+   * @return Element at the given percentile.
+   */
+  static Long percentile(List<Long> list, double percentile) {
+    if (list.isEmpty()) {
+      return 0L;
+    }
+
+    List<Long> sorted = Ordering.natural().immutableSortedCopy(list);
+    int total = sorted.size();
+    int percentileElements = (int) Math.floor(percentile / 100 * total);
+    int index = total - percentileElements - 1;
+    return index >= 0 && index < total ? sorted.get(index) : 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
index b6610f7..966c498 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
@@ -26,8 +26,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableMap;
 
-import org.apache.aurora.gen.ResourceAggregate;
-import org.apache.aurora.scheduler.quota.ResourceAggregates;
+import org.apache.aurora.scheduler.base.ResourceAggregates;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -38,10 +37,10 @@ import static com.google.common.base.Preconditions.checkNotNull;
  */
 class SlotSizeCounter implements Runnable {
   private static final Map<String, IResourceAggregate> SLOT_SIZES = ImmutableMap.of(
-      "small", IResourceAggregate.build(new ResourceAggregate(1.0, 1024, 4096)),
-      "medium", IResourceAggregate.build(new ResourceAggregate(4.0, 8192, 16384)),
-      "large", IResourceAggregate.build(new ResourceAggregate(8.0, 16384, 32768)),
-      "xlarge", IResourceAggregate.build(new ResourceAggregate(16.0, 32768, 65536)));
+      "small", ResourceAggregates.SMALL,
+      "medium", ResourceAggregates.MEDIUM,
+      "large", ResourceAggregates.LARGE,
+      "xlarge", ResourceAggregates.XLARGE);
 
   private final Map<String, IResourceAggregate> slotSizes;
   private final MachineResourceProvider machineResourceProvider;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
index 14af68f..ba67454 100644
--- a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
@@ -25,6 +25,7 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.ResourceAggregates;
 import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
 import org.apache.aurora.scheduler.quota.QuotaManager.QuotaManagerImpl;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/test/java/org/apache/aurora/scheduler/sla/MetricCalculatorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/sla/MetricCalculatorTest.java b/src/test/java/org/apache/aurora/scheduler/sla/MetricCalculatorTest.java
new file mode 100644
index 0000000..c5dcc06
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/sla/MetricCalculatorTest.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.sla;
+
+import com.google.common.collect.ImmutableMap;
+import com.twitter.common.base.Supplier;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stat;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.sla.MetricCalculator.MetricCalculatorSettings;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.easymock.EasyMock.expect;
+
+public class MetricCalculatorTest extends EasyMockTest {
+
+  private final FakeClock clock = new FakeClock();
+  private StorageTestUtil storageUtil;
+  private MetricCalculator calculator;
+
+  @Before
+  public void setUp() throws Exception {
+    StatsProvider statsProvider = createMock(StatsProvider.class);
+    MetricCalculatorSettings settings = new MetricCalculatorSettings(10000);
+    storageUtil = new StorageTestUtil(this);
+    calculator = new MetricCalculator(storageUtil.storage, clock, settings, statsProvider);
+    expect(statsProvider.makeGauge(EasyMock.anyString(), EasyMock.<Supplier<Number>>anyObject()))
+        .andReturn(EasyMock.<Stat<Number>>anyObject())
+        .anyTimes();
+  }
+
+  @Test
+  public void runTest() {
+    clock.advance(Amount.of(10L, Time.SECONDS));
+    storageUtil.expectTaskFetch(Query.unscoped(),
+            SlaTestUtil.makeTask(ImmutableMap.of(clock.nowMillis() - 1000, PENDING), 0),
+            SlaTestUtil.makeTask(ImmutableMap.of(clock.nowMillis() - 2000, PENDING), 1),
+            SlaTestUtil.makeTask(ImmutableMap.of(clock.nowMillis() - 3000, PENDING), 2));
+    storageUtil.expectOperations();
+
+    control.replay();
+    calculator.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/test/java/org/apache/aurora/scheduler/sla/SlaAlgorithmTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/sla/SlaAlgorithmTest.java b/src/test/java/org/apache/aurora/scheduler/sla/SlaAlgorithmTest.java
new file mode 100644
index 0000000..92244b7
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/sla/SlaAlgorithmTest.java
@@ -0,0 +1,291 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.sla;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+
+import org.apache.aurora.gen.ScheduleStatus;
+
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.SANDBOX_DELETED;
+import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.AGGREGATE_PLATFORM_UPTIME;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.JOB_UPTIME_50;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.JOB_UPTIME_75;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.JOB_UPTIME_90;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.JOB_UPTIME_95;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.JOB_UPTIME_99;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.MEDIAN_TIME_TO_ASSIGNED;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.MEDIAN_TIME_TO_RUNNING;
+import static org.junit.Assert.assertEquals;
+
+public class SlaAlgorithmTest {
+
+  @Test
+  public void MedianTimeToAssignedEvenTest() {
+    Number actual = MEDIAN_TIME_TO_ASSIGNED.getAlgorithm().calculate(
+        ImmutableSet.of(
+            makeTask(ImmutableMap.of(50L, PENDING, 200L, ASSIGNED, 250L, KILLED)),
+            makeTask(ImmutableMap.of(100L, PENDING, 200L, ASSIGNED, 300L, RUNNING)),
+            makeTask(ImmutableMap.of(200L, PENDING, 250L, ASSIGNED, 350L, STARTING))),
+        Range.<Long>all());
+    assertEquals(50L, actual);
+  }
+
+  @Test
+  public void MedianTimeToAssignedOddTest() {
+    Number actual = MEDIAN_TIME_TO_ASSIGNED.getAlgorithm().calculate(
+        ImmutableSet.of(
+            makeTask(ImmutableMap.of(50L, PENDING, 200L, ASSIGNED, 250L, RUNNING)),
+            makeTask(ImmutableMap.of(100L, PENDING, 200L, ASSIGNED, 300L, RUNNING)),
+            makeTask(ImmutableMap.of(200L, PENDING, 250L, ASSIGNED, 350L, STARTING))),
+        Range.<Long>all());
+    assertEquals(100L, actual);
+  }
+
+  @Test
+  public void MedianTimeToAssignedZeroTest() {
+    Number actual = MEDIAN_TIME_TO_ASSIGNED.getAlgorithm().calculate(
+        ImmutableSet.of(
+            makeTask(ImmutableMap.of(50L, PENDING)),
+            makeTask(ImmutableMap.of(100L, PENDING, 200L, ASSIGNED, 300L, KILLED))),
+        Range.<Long>all());
+    assertEquals(0L, actual);
+  }
+
+  @Test
+  public void MedianTimeToAssignedOneTest() {
+    Number actual = MEDIAN_TIME_TO_ASSIGNED.getAlgorithm().calculate(
+        ImmutableSet.of(
+            makeTask(ImmutableMap.of(50L, PENDING)),
+            makeTask(ImmutableMap.of(100L, PENDING, 200L, ASSIGNED))),
+        Range.<Long>all());
+    assertEquals(100L, actual);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void MedianTimeToAssignedNoPendingTest() {
+    MEDIAN_TIME_TO_ASSIGNED.getAlgorithm().calculate(
+        ImmutableSet.of(
+            makeTask(ImmutableMap.of(50L, ASSIGNED))),
+        Range.<Long>all());
+  }
+
+  @Test
+  public void MedianTimeToRunningEvenTest() {
+    Number actual = MEDIAN_TIME_TO_RUNNING.getAlgorithm().calculate(
+        ImmutableSet.of(
+            makeTask(ImmutableMap.of(50L, PENDING)), // Ignored as not RUNNING
+            makeTask(ImmutableMap.of(50L, PENDING, 100L, ASSIGNED, 150L, STARTING, 180L, RUNNING)),
+            makeTask(ImmutableMap.of(100L, PENDING, 200L, ASSIGNED, 300L, STARTING, 400L, RUNNING)),
+            makeTask(ImmutableMap.of(
+                50L, PENDING,
+                100L, ASSIGNED,
+                150L, STARTING,
+                200L, RUNNING,
+                300L, KILLED))), // Ignored due to being terminal.
+        Range.<Long>all());
+    assertEquals(130L, actual);
+  }
+
+  @Test
+  public void MedianTimeToRunningOddTest() {
+    Number actual = MEDIAN_TIME_TO_RUNNING.getAlgorithm().calculate(
+        ImmutableSet.of(
+            makeTask(ImmutableMap.of(50L, PENDING)), // Ignored as not RUNNING
+            makeTask(ImmutableMap.of(50L, PENDING, 100L, ASSIGNED, 150L, STARTING, 180L, RUNNING)),
+            makeTask(ImmutableMap.of(100L, PENDING, 200L, ASSIGNED, 300L, STARTING, 400L, RUNNING)),
+            makeTask(ImmutableMap.of(50L, PENDING, 100L, ASSIGNED, 150L, STARTING, 200L, RUNNING))),
+        Range.<Long>all());
+    assertEquals(150L, actual);
+  }
+
+  @Test
+  public void MedianTimeToRunningZeroTest() {
+    Number actual = MEDIAN_TIME_TO_RUNNING.getAlgorithm().calculate(
+        ImmutableSet.of(
+            makeTask(ImmutableMap.of(50L, PENDING)),
+            makeTask(ImmutableMap.of(50L, PENDING, 100L, RUNNING, 200L, KILLED))),
+            Range.<Long>all());
+    assertEquals(0L, actual);
+  }
+
+  @Test
+  public void JobUptime50Test() {
+    long now = System.currentTimeMillis();
+    Number actual = JOB_UPTIME_50.getAlgorithm().calculate(
+        makeUptimeTasks(100, now),
+        Range.closed(0L, now));
+    assertEquals(50, actual);
+  }
+
+  @Test
+  public void JobUptime75Test() {
+    long now = System.currentTimeMillis();
+    Number actual = JOB_UPTIME_75.getAlgorithm().calculate(
+        makeUptimeTasks(100, now),
+        Range.closed(0L, now));
+    assertEquals(25, actual);
+  }
+
+  @Test
+  public void JobUptime90Test() {
+    long now = System.currentTimeMillis();
+    Number actual = JOB_UPTIME_90.getAlgorithm().calculate(
+        makeUptimeTasks(100, now),
+        Range.closed(0L, now));
+    assertEquals(10, actual);
+  }
+
+  @Test
+  public void JobUptime95Test() {
+    long now = System.currentTimeMillis();
+    Number actual = JOB_UPTIME_95.getAlgorithm().calculate(
+        makeUptimeTasks(100, now),
+        Range.closed(0L, now));
+    assertEquals(5, actual);
+  }
+
+  @Test
+  public void JobUptime99Test() {
+    long now = System.currentTimeMillis();
+    Number actual = JOB_UPTIME_99.getAlgorithm().calculate(
+        makeUptimeTasks(100, now),
+        Range.closed(0L, now));
+    assertEquals(1, actual);
+  }
+
+  @Test
+  public void JobUptimeEmptyTest() {
+    long now = System.currentTimeMillis();
+    Number actual = JOB_UPTIME_99.getAlgorithm().calculate(
+        new LinkedList<IScheduledTask>(),
+        Range.closed(0L, now));
+    assertEquals(0, actual);
+  }
+
+  @Test
+  public void JobUptimeNonTerminalIgnoredTest() {
+    long now = System.currentTimeMillis();
+    Set<IScheduledTask> instances = makeUptimeTasks(100, now);
+    instances.add(makeTask(ImmutableMap.of(now - 5000, RUNNING, now - 3000, KILLED)));
+    Number actual = JOB_UPTIME_99.getAlgorithm().calculate(instances, Range.closed(0L, now));
+    assertEquals(1, actual);
+  }
+
+  @Test
+  public void JobUptimeLiveNonTerminalIgnoredTest() {
+    long now = System.currentTimeMillis();
+    Set<IScheduledTask> instances = makeUptimeTasks(100, now);
+    instances.add(makeTask(ImmutableMap.of(now - 5000, RUNNING, now - 3000, RESTARTING)));
+    Number actual = JOB_UPTIME_99.getAlgorithm().calculate(instances, Range.closed(0L, now));
+    assertEquals(1, actual);
+  }
+
+  @Test
+  public void AggregatePlatformUptimeTest() {
+    Number actual = AGGREGATE_PLATFORM_UPTIME.getAlgorithm().calculate(
+        ImmutableSet.of(
+            makeTask(ImmutableMap.of(100L, PENDING), 0), // Ignored.
+            makeTask(ImmutableMap.of(
+                100L, PENDING,
+                200L, ASSIGNED,
+                300L, STARTING,
+                400L, RUNNING), 1), // 100% uptime.
+            makeTask(ImmutableMap.<Long, ScheduleStatus>builder()
+                .put(5L, INIT)
+                .put(10L, PENDING)
+                .put(20L, ASSIGNED)
+                .put(30L, STARTING)
+                .put(50L, RUNNING)
+                .put(400L, KILLING)
+                .put(450L, KILLED).build(), 2)), // 100% uptime.
+        Range.closedOpen(100L, 500L));
+    assertEquals(100.0, actual);
+  }
+
+  @Test
+  public void AggregatePlatformUptimeRecoveredFromDownTest() {
+    Number actual = AGGREGATE_PLATFORM_UPTIME.getAlgorithm().calculate(
+        ImmutableSet.of(
+            makeTask(ImmutableMap.of(50L, RUNNING, 300L, LOST, 310L, KILLED), 0), // DOWN mid range.
+            makeTask(ImmutableMap.of(
+                320L, PENDING,
+                330L, ASSIGNED,
+                350L, STARTING,
+                400L, RUNNING), 0)),  // Recovered within range.
+        Range.closedOpen(100L, 500L));
+    assertEquals(75.0, actual);
+  }
+
+  @Test
+  public void AggregatePlatformUptimeKilledByPlatformTest() {
+    Number actual = AGGREGATE_PLATFORM_UPTIME.getAlgorithm().calculate(
+        ImmutableSet.of(makeTask(ImmutableMap.of(50L, RUNNING, 300L, KILLED), 0)),
+        Range.closedOpen(100L, 500L));
+    assertEquals(50.0, actual);
+  }
+
+  @Test
+  public void AggregatePlatformUptimeSandboxDeletedIgnoredTest() {
+    Number actual = AGGREGATE_PLATFORM_UPTIME.getAlgorithm().calculate(
+        ImmutableSet.of(
+            makeTask(ImmutableMap.of(50L, RUNNING, 300L, LOST, 400L, SANDBOX_DELETED), 0)),
+        Range.closedOpen(100L, 500L));
+    assertEquals(50.0, actual);
+  }
+
+  @Test
+  public void AggregatePlatformUptimeEmptyTest() {
+    Number actual = AGGREGATE_PLATFORM_UPTIME.getAlgorithm().calculate(
+        ImmutableSet.of(makeTask(ImmutableMap.of(50L, PENDING), 0)),
+        Range.closedOpen(100L, 500L));
+    assertEquals(100.0, actual);
+  }
+
+  private static Set<IScheduledTask> makeUptimeTasks(int num, long now) {
+    Set<IScheduledTask> instances = Sets.newHashSet();
+    for (int i = 0; i < num; i++) {
+      instances.add(makeTask(ImmutableMap.of(now - (i + 1) * 1000, RUNNING)));
+    }
+    return instances;
+  }
+
+  private static IScheduledTask makeTask(Map<Long, ScheduleStatus> events) {
+    return makeTask(events, 0);
+  }
+
+  private static IScheduledTask makeTask(Map<Long, ScheduleStatus> events, int instanceId) {
+    return SlaTestUtil.makeTask(events, instanceId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/test/java/org/apache/aurora/scheduler/sla/SlaTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/sla/SlaTestUtil.java b/src/test/java/org/apache/aurora/scheduler/sla/SlaTestUtil.java
new file mode 100644
index 0000000..b6d4774
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/sla/SlaTestUtil.java
@@ -0,0 +1,65 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.sla;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
+
+public final class SlaTestUtil {
+
+  private SlaTestUtil() {
+    // Utility class.
+  }
+
+  public static IScheduledTask makeTask(
+      Map<Long, ScheduleStatus> events, int instanceId) {
+    List<ITaskEvent> taskEvents = makeEvents(events);
+    return IScheduledTask.build(new ScheduledTask()
+        .setStatus(Iterables.getLast(taskEvents).getStatus())
+        .setTaskEvents(ITaskEvent.toBuildersList(taskEvents))
+        .setAssignedTask(new AssignedTask()
+            .setTaskId("task_Id")
+            .setSlaveHost("host")
+            .setInstanceId(instanceId)
+            .setTask(new TaskConfig()
+                .setJobName("job")
+                .setIsService(true)
+                .setProduction(true)
+                .setEnvironment("env")
+                .setOwner(new Identity("role", "role-user")))));
+  }
+
+  public static List<ITaskEvent> makeEvents(Map<Long, ScheduleStatus> events) {
+    ImmutableList.Builder<ITaskEvent> taskEvents = ImmutableList.builder();
+    for (Map.Entry<Long, ScheduleStatus> entry : events.entrySet()) {
+      taskEvents.add(ITaskEvent.build(new TaskEvent(entry.getKey(), entry.getValue())));
+    }
+
+    return taskEvents.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java b/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java
index 415b48c..c732f57 100644
--- a/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java
@@ -24,7 +24,7 @@ import com.twitter.common.stats.StatsProvider;
 import com.twitter.common.testing.easymock.EasyMockTest;
 
 import org.apache.aurora.gen.ResourceAggregate;
-import org.apache.aurora.scheduler.quota.ResourceAggregates;
+import org.apache.aurora.scheduler.base.ResourceAggregates;
 import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResource;
 import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResourceProvider;
 import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
index 4a13dbc..b985fbb 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
@@ -36,7 +36,7 @@ import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.gen.storage.StoredJob;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.quota.ResourceAggregates;
+import org.apache.aurora.scheduler.base.ResourceAggregates;
 import org.apache.aurora.scheduler.storage.SnapshotStore;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.ILock;