You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/04/24 01:57:17 UTC
git commit: Implementing Scheduler SLA metrics.
Repository: incubator-aurora
Updated Branches:
refs/heads/master 4e42252be -> 53dc494ff
Implementing Scheduler SLA metrics.
High level overview:
- MetricCalculator runs periodically (every minute), pulls all task history,
packages it into SlaInstance list and updates stats;
- Stat calculation is handled by a pair of: SlaAlgorithm and a set of
applicable SlaGroups (logical groupings by job, cluster, resource and etc.);
- Stat name is generated by combining group and algorithm name parts.
Bugs closed: AURORA-293
Reviewed at https://reviews.apache.org/r/20398/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/53dc494f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/53dc494f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/53dc494f
Branch: refs/heads/master
Commit: 53dc494ff2dba934468e32524824aef13f2efc73
Parents: 4e42252
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Wed Apr 23 16:55:07 2014 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Wed Apr 23 16:55:07 2014 -0700
----------------------------------------------------------------------
.../apache/aurora/scheduler/app/AppModule.java | 2 +
.../scheduler/async/RescheduleCalculator.java | 16 +-
.../scheduler/base/ResourceAggregates.java | 79 ++++
.../org/apache/aurora/scheduler/base/Tasks.java | 8 +
.../aurora/scheduler/quota/QuotaManager.java | 1 +
.../scheduler/quota/ResourceAggregates.java | 66 ---
.../aurora/scheduler/sla/MetricCalculator.java | 170 ++++++++
.../aurora/scheduler/sla/SlaAlgorithm.java | 437 +++++++++++++++++++
.../apache/aurora/scheduler/sla/SlaGroup.java | 174 ++++++++
.../apache/aurora/scheduler/sla/SlaModule.java | 116 +++++
.../apache/aurora/scheduler/sla/SlaUtil.java | 50 +++
.../aurora/scheduler/stats/SlotSizeCounter.java | 11 +-
.../scheduler/quota/QuotaManagerImplTest.java | 1 +
.../scheduler/sla/MetricCalculatorTest.java | 66 +++
.../aurora/scheduler/sla/SlaAlgorithmTest.java | 291 ++++++++++++
.../aurora/scheduler/sla/SlaTestUtil.java | 65 +++
.../scheduler/stats/SlotSizeCounterTest.java | 2 +-
.../storage/log/SnapshotStoreImplTest.java | 2 +-
18 files changed, 1471 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
index 2bedf9b..3dc15ad 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
@@ -53,6 +53,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
import org.apache.aurora.scheduler.http.ServletModule;
import org.apache.aurora.scheduler.metadata.MetadataModule;
import org.apache.aurora.scheduler.quota.QuotaModule;
+import org.apache.aurora.scheduler.sla.SlaModule;
import org.apache.aurora.scheduler.state.StateModule;
import org.apache.aurora.scheduler.stats.AsyncStatsModule;
import org.apache.aurora.scheduler.storage.entities.IServerInfo;
@@ -116,6 +117,7 @@ class AppModule extends AbstractModule {
install(new ServletModule());
install(new SchedulerModule());
install(new StateModule());
+ install(new SlaModule());
bind(StatsProvider.class).toInstance(Stats.STATS_PROVIDER);
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
index bc77bf2..4f79f32 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/RescheduleCalculator.java
@@ -22,7 +22,6 @@ import java.util.logging.Logger;
import javax.inject.Inject;
-import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
@@ -82,14 +81,6 @@ public interface RescheduleCalculator {
private static final Predicate<ScheduleStatus> IS_ACTIVE_STATUS =
Predicates.in(Tasks.ACTIVE_STATES);
- private static final Function<ITaskEvent, ScheduleStatus> TO_STATUS =
- new Function<ITaskEvent, ScheduleStatus>() {
- @Override
- public ScheduleStatus apply(ITaskEvent input) {
- return input.getStatus();
- }
- };
-
private static final Set<ScheduleStatus> INTERRUPTED_TASK_STATES =
EnumSet.of(RESTARTING, KILLING, DRAINING);
@@ -104,7 +95,7 @@ public interface RescheduleCalculator {
// Avoid penalizing tasks that were interrupted by outside action, such as a user
// restarting them.
- if (Iterables.any(Iterables.transform(events, TO_STATUS),
+ if (Iterables.any(Iterables.transform(events, Tasks.TASK_EVENT_TO_STATUS),
Predicates.in(INTERRUPTED_TASK_STATES))) {
return false;
}
@@ -113,8 +104,9 @@ public interface RescheduleCalculator {
ScheduleStatus terminalState = terminalEvent.getStatus();
Preconditions.checkState(Tasks.isTerminated(terminalState));
- ITaskEvent activeEvent =
- Iterables.find(events, Predicates.compose(IS_ACTIVE_STATUS, TO_STATUS));
+ ITaskEvent activeEvent = Iterables.find(
+ events,
+ Predicates.compose(IS_ACTIVE_STATUS, Tasks.TASK_EVENT_TO_STATUS));
long thresholdMs = settings.flappingTaskThreashold.as(Time.MILLISECONDS);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/base/ResourceAggregates.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/ResourceAggregates.java b/src/main/java/org/apache/aurora/scheduler/base/ResourceAggregates.java
new file mode 100644
index 0000000..7804300
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/base/ResourceAggregates.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright 2013 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.base;
+
+import com.google.common.collect.Ordering;
+
+import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+
+/**
+ * Convenience class for normalizing resource measures between tasks and offers.
+ */
+public final class ResourceAggregates {
+
+ public static final IResourceAggregate EMPTY =
+ IResourceAggregate.build(new ResourceAggregate(0, 0, 0));
+
+ public static final IResourceAggregate SMALL =
+ IResourceAggregate.build(new ResourceAggregate(1.0, 1024, 4096));
+
+ public static final IResourceAggregate MEDIUM =
+ IResourceAggregate.build(new ResourceAggregate(4.0, 8192, 16384));
+
+ public static final IResourceAggregate LARGE =
+ IResourceAggregate.build(new ResourceAggregate(8.0, 16384, 32768));
+
+ public static final IResourceAggregate XLARGE =
+ IResourceAggregate.build(new ResourceAggregate(16.0, 32768, 65536));
+
+ private ResourceAggregates() {
+ // Utility class.
+ }
+
+ /**
+ * Returns a quota with all resource vectors zeroed.
+ *
+ * @return A resource aggregate with all resource vectors zeroed.
+ */
+ public static IResourceAggregate none() {
+ return EMPTY;
+ }
+
+ /**
+ * a * m
+ */
+ public static IResourceAggregate scale(IResourceAggregate a, int m) {
+ return IResourceAggregate.build(new ResourceAggregate()
+ .setNumCpus(a.getNumCpus() * m)
+ .setRamMb(a.getRamMb() * m)
+ .setDiskMb(a.getDiskMb() * m));
+ }
+
+ /**
+ * a / b
+ * <p>
+ * This calculates how many times {@code b} "fits into" {@code a}. Behavior is undefined when
+ * {@code b} contains resources with a value of zero.
+ */
+ public static int divide(IResourceAggregate a, IResourceAggregate b) {
+ return Ordering.natural().min(
+ a.getNumCpus() / b.getNumCpus(),
+ (double) a.getRamMb() / b.getRamMb(),
+ (double) a.getDiskMb() / b.getDiskMb()
+ ).intValue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
index fae2d23..e5cc469 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/Tasks.java
@@ -119,6 +119,14 @@ public final class Tasks {
public static final Function<IScheduledTask, String> SCHEDULED_TO_SLAVE_HOST =
Functions.compose(ASSIGNED_TO_SLAVE_HOST, SCHEDULED_TO_ASSIGNED);
+ public static final Function<ITaskEvent, ScheduleStatus> TASK_EVENT_TO_STATUS =
+ new Function<ITaskEvent, ScheduleStatus>() {
+ @Override
+ public ScheduleStatus apply(ITaskEvent input) {
+ return input.getStatus();
+ }
+ };
+
/**
* Different states that an active task may be in.
*/
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
index 56f47b6..ea86d52 100644
--- a/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/quota/QuotaManager.java
@@ -23,6 +23,7 @@ import com.google.inject.Inject;
import org.apache.aurora.gen.ResourceAggregate;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.ResourceAggregates;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/quota/ResourceAggregates.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/quota/ResourceAggregates.java b/src/main/java/org/apache/aurora/scheduler/quota/ResourceAggregates.java
deleted file mode 100644
index 444c287..0000000
--- a/src/main/java/org/apache/aurora/scheduler/quota/ResourceAggregates.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Copyright 2013 Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.quota;
-
-import com.google.common.collect.Ordering;
-
-import org.apache.aurora.gen.ResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-
-/**
- * Convenience class for normalizing resource measures between tasks and offers.
- */
-public final class ResourceAggregates {
- private static final IResourceAggregate EMPTY_RESOURCE_AGGREGATE =
- IResourceAggregate.build(new ResourceAggregate(0, 0, 0));
-
- private ResourceAggregates() {
- // Utility class.
- }
-
- /**
- * Returns a quota with all resource vectors zeroed.
- *
- * @return A resource aggregate with all resource vectors zeroed.
- */
- public static IResourceAggregate none() {
- return EMPTY_RESOURCE_AGGREGATE;
- }
-
- /**
- * a * m
- */
- public static IResourceAggregate scale(IResourceAggregate a, int m) {
- return IResourceAggregate.build(new ResourceAggregate()
- .setNumCpus(a.getNumCpus() * m)
- .setRamMb(a.getRamMb() * m)
- .setDiskMb(a.getDiskMb() * m));
- }
-
- /**
- * a / b
- * <p>
- * This calculates how many times {@code b} "fits into" {@code a}. Behavior is undefined when
- * {@code b} contains resources with a value of zero.
- */
- public static int divide(IResourceAggregate a, IResourceAggregate b) {
- return Ordering.natural().min(
- a.getNumCpus() / b.getNumCpus(),
- (double) a.getRamMb() / b.getRamMb(),
- (double) a.getDiskMb() / b.getDiskMb()
- ).intValue();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java b/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
new file mode 100644
index 0000000..bc90030
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/sla/MetricCalculator.java
@@ -0,0 +1,170 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.sla;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.base.Supplier;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Range;
+
+import com.twitter.common.inject.TimedInterceptor.Timed;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType;
+import org.apache.aurora.scheduler.sla.SlaGroup.GroupType;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.AGGREGATE_PLATFORM_UPTIME;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.JOB_UPTIME_50;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.JOB_UPTIME_75;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.JOB_UPTIME_90;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.JOB_UPTIME_95;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.JOB_UPTIME_99;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.MEDIAN_TIME_TO_ASSIGNED;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.MEDIAN_TIME_TO_RUNNING;
+import static org.apache.aurora.scheduler.sla.SlaGroup.GroupType.CLUSTER;
+import static org.apache.aurora.scheduler.sla.SlaGroup.GroupType.JOB;
+import static org.apache.aurora.scheduler.sla.SlaGroup.GroupType.RESOURCE_CPU;
+import static org.apache.aurora.scheduler.sla.SlaGroup.GroupType.RESOURCE_DISK;
+import static org.apache.aurora.scheduler.sla.SlaGroup.GroupType.RESOURCE_RAM;
+
+/**
+ * Responsible for calculating and exporting SLA metrics.
+ */
+class MetricCalculator implements Runnable {
+
+ private static final Multimap<AlgorithmType, GroupType> METRICS =
+ ImmutableMultimap.<AlgorithmType, GroupType>builder()
+ .put(JOB_UPTIME_50, JOB)
+ .put(JOB_UPTIME_75, JOB)
+ .put(JOB_UPTIME_90, JOB)
+ .put(JOB_UPTIME_95, JOB)
+ .put(JOB_UPTIME_99, JOB)
+ .putAll(AGGREGATE_PLATFORM_UPTIME, JOB, CLUSTER)
+ .putAll(MEDIAN_TIME_TO_ASSIGNED, JOB, CLUSTER, RESOURCE_CPU, RESOURCE_RAM, RESOURCE_DISK)
+ .putAll(MEDIAN_TIME_TO_RUNNING, JOB, CLUSTER, RESOURCE_CPU, RESOURCE_RAM, RESOURCE_DISK)
+ .build();
+
+ private static final Predicate<ITaskConfig> IS_SERVICE =
+ new Predicate<ITaskConfig>() {
+ @Override
+ public boolean apply(ITaskConfig task) {
+ return task.isIsService();
+ }
+ };
+
+ private final LoadingCache<String, Counter> metricCache;
+ private final Storage storage;
+ private final Clock clock;
+ private final MetricCalculatorSettings settings;
+
+ static class MetricCalculatorSettings {
+ private final long refreshRateMs;
+
+ MetricCalculatorSettings(long refreshRateMs) {
+ this.refreshRateMs = refreshRateMs;
+ }
+ }
+
+ private static class Counter implements Supplier<Number> {
+ private final AtomicReference<Number> value = new AtomicReference<>((Number) 0);
+ private final StatsProvider statsProvider;
+ private boolean exported;
+
+ Counter(StatsProvider statsProvider) {
+ this.statsProvider = statsProvider;
+ }
+
+ @Override
+ public Number get() {
+ return value.get();
+ }
+
+ private void set(String name, Number newValue) {
+ if (!exported) {
+ statsProvider.makeGauge(name, this);
+ exported = true;
+ }
+ value.set(newValue);
+ }
+ }
+
+ @Inject
+ MetricCalculator(
+ Storage storage,
+ Clock clock,
+ MetricCalculatorSettings settings,
+ final StatsProvider statsProvider) {
+
+ this.storage = checkNotNull(storage);
+ this.clock = checkNotNull(clock);
+ this.settings = checkNotNull(settings);
+
+ checkNotNull(statsProvider);
+ this.metricCache = CacheBuilder.newBuilder().build(
+ new CacheLoader<String, Counter>() {
+ public Counter load(String key) {
+ return new Counter(statsProvider);
+ }
+ });
+ }
+
+ @Timed("sla_stats_computation")
+ @Override
+ public void run() {
+ List<IScheduledTask> tasks =
+ FluentIterable.from(Storage.Util.weaklyConsistentFetchTasks(storage, Query.unscoped()))
+ .filter(Predicates.compose(
+ Predicates.and(Tasks.IS_PRODUCTION, IS_SERVICE),
+ Tasks.SCHEDULED_TO_INFO)).toList();
+
+ long nowMs = clock.nowMillis();
+ long intervalStartMs = nowMs - settings.refreshRateMs;
+
+ for (Entry<AlgorithmType, GroupType> slaMetric : METRICS.entries()) {
+ for (Entry<String, Collection<IScheduledTask>> namedGroup
+ : slaMetric.getValue().getSlaGroup().createNamedGroups(tasks).asMap().entrySet()) {
+
+ AlgorithmType algoType = slaMetric.getKey();
+ String metricName = namedGroup.getKey() + algoType.getAlgorithmName();
+ metricCache.getUnchecked(metricName)
+ .set(metricName, algoType.getAlgorithm().calculate(
+ namedGroup.getValue(),
+ Range.closedOpen(intervalStartMs, nowMs)));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java
new file mode 100644
index 0000000..81efb2d
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaAlgorithm.java
@@ -0,0 +1,437 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.sla;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Objects;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Range;
+import com.twitter.common.collections.Pair;
+
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Defines an SLA algorithm to be applied to a {@link IScheduledTask}
+ * set for calculating a specific SLA metric.
+ */
+interface SlaAlgorithm {
+
+ /**
+ * Applies this algorithm to a set of {@link IScheduledTask} to
+ * produce a named metric value over the specified time frame.
+ *
+ * @param tasks Set of tasks to apply this algorithm to.
+ * @param timeFrame Relevant time frame.
+ * @return Produced metric value.
+ */
+ Number calculate(Iterable<IScheduledTask> tasks, Range<Long> timeFrame);
+
+ /**
+ * Pre-configured SLA algorithms.
+ */
+ enum AlgorithmType {
+
+ JOB_UPTIME_99(new JobUptime(99f), String.format(JobUptime.NAME_FORMAT, 99f)),
+ JOB_UPTIME_95(new JobUptime(95f), String.format(JobUptime.NAME_FORMAT, 95f)),
+ JOB_UPTIME_90(new JobUptime(90f), String.format(JobUptime.NAME_FORMAT, 90f)),
+ JOB_UPTIME_75(new JobUptime(75f), String.format(JobUptime.NAME_FORMAT, 75f)),
+ JOB_UPTIME_50(new JobUptime(50f), String.format(JobUptime.NAME_FORMAT, 50f)),
+ AGGREGATE_PLATFORM_UPTIME(new AggregatePlatformUptime(), "platform_uptime_percent"),
+ MEDIAN_TIME_TO_ASSIGNED(new MedianAlgorithm(ScheduleStatus.ASSIGNED), "mtta_ms"),
+ MEDIAN_TIME_TO_RUNNING(new MedianAlgorithm(ScheduleStatus.RUNNING), "mttr_ms");
+
+ private final SlaAlgorithm algorithm;
+ private final String name;
+
+ AlgorithmType(SlaAlgorithm algorithm, String name) {
+ this.algorithm = algorithm;
+ this.name = name;
+ }
+
+ SlaAlgorithm getAlgorithm() {
+ return algorithm;
+ }
+
+ String getAlgorithmName() {
+ return name;
+ }
+ }
+
+ /**
+ * Median time to status SLA algorithm.
+ * Represents the median time spent waiting for a set of tasks to reach specified status.
+ * A combined metric that helps tracking the task scheduling performance dependency on the
+ * requested resources (user scope) as well as the internal scheduler bin-packing algorithm
+ * efficiency (platform scope).
+ * <p/>
+ * Median time calculated as:
+ * <pre>
+ * MT = MEDIAN(Wait_times)
+ * where:
+ * Wait_times - a collection of time intervals between PENDING and specified task state.
+ *</pre>
+ */
+ final class MedianAlgorithm implements SlaAlgorithm {
+
+ private final ScheduleStatus status;
+
+ private MedianAlgorithm(ScheduleStatus status) {
+ this.status = status;
+ }
+
+ @Override
+ public Number calculate(Iterable<IScheduledTask> tasks, Range<Long> timeFrame) {
+ Iterable<IScheduledTask> activeTasks = FluentIterable.from(tasks)
+ .filter(Predicates.compose(Predicates.in(Tasks.ACTIVE_STATES), Tasks.GET_STATUS));
+
+ List<Long> waitTimes = Lists.newLinkedList();
+ for (IScheduledTask task : activeTasks) {
+ long pendingTs = 0;
+ for (ITaskEvent event : task.getTaskEvents()) {
+ if (event.getStatus() == ScheduleStatus.PENDING) {
+ pendingTs = event.getTimestamp();
+ } else if (event.getStatus() == status) {
+
+ if (pendingTs == 0) {
+ throw new IllegalArgumentException("SLA: missing PENDING status for:"
+ + task.getAssignedTask().getTaskId());
+ }
+
+ waitTimes.add(event.getTimestamp() - pendingTs);
+ break;
+ }
+ }
+ }
+
+ return SlaUtil.percentile(waitTimes, 50.0);
+ }
+ }
+
+ /**
+ * Job uptime SLA algorithm.
+ * Represents the percentage of instances considered to be in running state for
+ * the specified duration relative to SLA calculation time.
+ */
+ final class JobUptime implements SlaAlgorithm {
+
+ private static final String NAME_FORMAT = "job_uptime_%.2f_sec";
+ private final float percentile;
+
+ private static final Predicate<IScheduledTask> IS_RUNNING =
+ Predicates.compose(
+ Predicates.in(ImmutableSet.of(ScheduleStatus.RUNNING)),
+ Tasks.GET_STATUS);
+
+ private static final Function<IScheduledTask, ITaskEvent> TASK_TO_EVENT =
+ new Function<IScheduledTask, ITaskEvent>() {
+ @Override
+ public ITaskEvent apply(IScheduledTask task) {
+ return Tasks.getLatestEvent(task);
+ }
+ };
+
+ private JobUptime(float percentile) {
+ this.percentile = percentile;
+ }
+
+ @Override
+ public Number calculate(Iterable<IScheduledTask> tasks, final Range<Long> timeFrame) {
+ List<Long> uptimes = FluentIterable.from(tasks)
+ .filter(IS_RUNNING)
+ .transform(Functions.compose(
+ new Function<ITaskEvent, Long>() {
+ @Override
+ public Long apply(ITaskEvent event) {
+ return timeFrame.upperEndpoint() - event.getTimestamp();
+ }
+ },
+ TASK_TO_EVENT)).toList();
+
+ return (int) Math.floor((double) SlaUtil.percentile(uptimes, percentile) / 1000);
+ }
+ }
+
+ /**
+ * Aggregate Platform Uptime SLA algorithm.
+ * Aggregate amount of runnable time a platform managed to deliver for a set of tasks from the
+ * moment of reaching them RUNNING status. Excludes any time a task is not in a runnable state
+ * due to user activities (e.g. newly created waiting for host assignment or restarted/killed
+ * by the user).
+ * <p/>
+ * Aggregate platform uptime calculated as:
+ * <pre>
+ * APU = SUM(Up_time) / SUM(SI - Removed_time)
+ * where:
+ * Up_time - the aggregate instance UP time over the sampling interval (SI);
+ * SI - sampling interval (e.g. 1 minute);
+ * Removed_time - the aggregate instance REMOVED time over the sampling interval.
+ * </pre>
+ */
+ final class AggregatePlatformUptime implements SlaAlgorithm {
+
+ /**
+ * Task platform SLA state.
+ */
+ enum SlaState {
+ /**
+ * Starts a period when the task is not expected to be UP due to user initiated action
+ * or failure.
+ * <p/>
+ * This period is ignored for the calculation purposes.
+ */
+ REMOVED,
+
+ /**
+ * Starts a period when the task cannot reach the UP state for some non-user-related reason.
+ * <p/>
+ * Only platform-incurred task state transitions are considered here. If a task is newly
+ * created (e.g. by job create/update) the amount of time a task spends to reach its UP
+ * state is not counted towards platform downtime. For example, a newly added PENDING task
+ * is considered as REMOVED, whereas a PENDING task rescheduled from LOST will be considered
+ * as DOWN. This approach ensures this metric is not sensitive to user-initiated activities
+ * and is a true reflection of the system recovery performance.
+ */
+ DOWN,
+
+ /**
+ * Starts a period when the task is considered to be up and running from the Aurora
+ * platform standpoint.
+ * <p/>
+ * Note: The platform uptime does not necessarily equate to the real application
+ * availability. This is because a hosted application needs time to deploy, initialize,
+ * and start executing.
+ */
+ UP
+ }
+
+ private static class Interval {
+ private final SlaState state;
+ private final Range<Long> range;
+
+ Interval(SlaState state, long start, long end) {
+ this.state = state;
+ range = Range.closedOpen(start, end);
+ }
+ }
+
+ private static class InstanceId {
+ private final IJobKey jobKey;
+ private final int id;
+
+ InstanceId(IJobKey jobKey, int instanceId) {
+ this.jobKey = checkNotNull(jobKey);
+ this.id = instanceId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof InstanceId)) {
+ return false;
+ }
+
+ InstanceId other = (InstanceId) o;
+ return Objects.equal(jobKey, other.jobKey) && Objects.equal(id, other.id);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(jobKey, id);
+ }
+ }
+
+ private static final Function<IScheduledTask, InstanceId> TO_ID =
+ new Function<IScheduledTask, InstanceId>() {
+ @Override
+ public InstanceId apply(IScheduledTask task) {
+ return new InstanceId(
+ JobKeys.from(task.getAssignedTask().getTask()),
+ task.getAssignedTask().getInstanceId());
+ }
+ };
+
+ private static final Function<ITaskEvent, Long> TASK_EVENT_TO_TIMESTAMP =
+ new Function<ITaskEvent, Long>() {
+ @Override
+ public Long apply(ITaskEvent taskEvent) {
+ return taskEvent.getTimestamp();
+ }
+ };
+
+ /**
+ * Combine all task events per given instance into the unified sorted instance history view.
+ */
+ private static final Function<Collection<IScheduledTask>, List<ITaskEvent>>
+ TASKS_TO_SORTED_TASK_EVENTS = new Function<Collection<IScheduledTask>, List<ITaskEvent>>() {
+ @Override
+ public List<ITaskEvent> apply(Collection<IScheduledTask> tasks) {
+ List<ITaskEvent> result = Lists.newLinkedList();
+ for (IScheduledTask task : tasks) {
+ result.addAll(task.getTaskEvents());
+ }
+
+ return Ordering.natural()
+ .onResultOf(TASK_EVENT_TO_TIMESTAMP).immutableSortedCopy(result);
+ }
+ };
+
+ /**
+ * Convert instance history into the {@link SlaState} based {@link Interval} list.
+ */
+ private static final Function<List<ITaskEvent>, List<Interval>> TASK_EVENTS_TO_INTERVALS =
+ new Function<List<ITaskEvent>, List<Interval>>() {
+ @Override
+ public List<Interval> apply(List<ITaskEvent> events) {
+
+ ImmutableList.Builder<Interval> intervals = ImmutableList.builder();
+ Pair<SlaState, Long> current = Pair.of(SlaState.REMOVED, 0L);
+
+ for (ITaskEvent event : events) {
+ long timestamp = event.getTimestamp();
+
+ // Event status in the instance timeline signifies either of the following:
+ // - termination of the existing SlaState interval AND start of a new one;
+ // - continuation of the existing matching SlaState interval.
+ switch (event.getStatus()) {
+ case LOST:
+ case DRAINING:
+ case PREEMPTING:
+ current = updateIntervals(timestamp, SlaState.DOWN, current, intervals);
+ break;
+
+ case PENDING:
+ case ASSIGNED:
+ case STARTING:
+ if (current.getFirst() != SlaState.DOWN) {
+ current = updateIntervals(timestamp, SlaState.REMOVED, current, intervals);
+ }
+ break;
+
+ case THROTTLED:
+ case FINISHED:
+ case RESTARTING:
+ case FAILED:
+ case KILLING:
+ current = updateIntervals(timestamp, SlaState.REMOVED, current, intervals);
+ break;
+
+ case RUNNING:
+ current = updateIntervals(timestamp, SlaState.UP, current, intervals);
+ break;
+
+ case KILLED:
+ if (current.getFirst() == SlaState.UP) {
+ current = updateIntervals(timestamp, SlaState.DOWN, current, intervals);
+ }
+ break;
+
+ case INIT:
+ case SANDBOX_DELETED:
+ // Ignore.
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unsupported status:" + event.getStatus());
+ }
+ }
+ // Add the last event interval.
+ intervals.add(new Interval(current.getFirst(), current.getSecond(), Long.MAX_VALUE));
+ return intervals.build();
+ }
+ };
+
+ private static Pair<SlaState, Long> updateIntervals(
+ long timestamp,
+ SlaState state,
+ Pair<SlaState, Long> current,
+ ImmutableList.Builder<Interval> intervals) {
+
+ if (current.getFirst() == state) {
+ // Current interval state matches the event state - skip.
+ return current;
+ } else {
+ // Terminate current interval, add it to list and start a new interval.
+ intervals.add(new Interval(current.getFirst(), current.getSecond(), timestamp));
+ return Pair.of(state, timestamp);
+ }
+ }
+
+ private AggregatePlatformUptime() {
+ // Interface private.
+ }
+
+ @Override
+ public Number calculate(Iterable<IScheduledTask> tasks, Range<Long> timeFrame) {
+ // Given the set of tasks do the following:
+ // - index all available tasks by InstanceId (JobKey + instance ID);
+ // - combine individual task ITaskEvent lists into the instance based timeline to represent
+ // all available history for a given task instance;
+ // - convert instance timeline into the SlaState intervals.
+ Map<InstanceId, List<Interval>> instanceSlaTimeline =
+ Maps.transformValues(
+ Multimaps.index(tasks, TO_ID).asMap(),
+ Functions.compose(TASK_EVENTS_TO_INTERVALS, TASKS_TO_SORTED_TASK_EVENTS));
+
+ // Given the instance timeline converted to SlaState-based time intervals, aggregate the
+ // platform uptime per given timeFrame.
+ long aggregateUptime = 0;
+ long aggregateTotal = 0;
+ for (List<Interval> intervals : instanceSlaTimeline.values()) {
+ long instanceUptime = elapsedFromRange(timeFrame);
+ long instanceTotal = instanceUptime;
+ for (Interval interval : intervals) {
+ if (timeFrame.isConnected(interval.range)) {
+ long intersection = elapsedFromRange(timeFrame.intersection(interval.range));
+ if (interval.state == SlaState.REMOVED) {
+ instanceUptime -= intersection;
+ instanceTotal -= intersection;
+ } else if (interval.state == SlaState.DOWN) {
+ instanceUptime -= intersection;
+ }
+ }
+ }
+ aggregateUptime += instanceUptime;
+ aggregateTotal += instanceTotal;
+ }
+
+ // Calculate effective platform uptime or default to 100.0 if no instances are running yet.
+ return aggregateTotal > 0 ? (double) aggregateUptime * 100 / aggregateTotal : 100.0;
+ }
+
+ private static long elapsedFromRange(Range<Long> range) {
+ return range.upperEndpoint() - range.lowerEndpoint();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
new file mode 100644
index 0000000..46be612
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaGroup.java
@@ -0,0 +1,174 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.sla;
+
+import java.util.Map;
+
+import com.google.common.base.Functions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Range;
+import com.twitter.common.base.Function;
+
+import org.apache.aurora.scheduler.base.JobKeys;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+
+import static org.apache.aurora.scheduler.base.ResourceAggregates.EMPTY;
+import static org.apache.aurora.scheduler.base.ResourceAggregates.LARGE;
+import static org.apache.aurora.scheduler.base.ResourceAggregates.MEDIUM;
+import static org.apache.aurora.scheduler.base.ResourceAggregates.SMALL;
+import static org.apache.aurora.scheduler.base.ResourceAggregates.XLARGE;
+
+/**
+ * Defines a logical grouping criteria to be applied over a set of tasks.
+ */
+interface SlaGroup {
+
+ /**
+ * Generates named groups based on the set of provided tasks.
+ *
+ * @param tasks Set of tasks to generate named groups for.
+ * @return Multimap of group names and relevant tasks.
+ */
+ Multimap<String, IScheduledTask> createNamedGroups(Iterable<IScheduledTask> tasks);
+
+ /**
+ * Pre-configured SLA groupings.
+ */
+ enum GroupType {
+ JOB(new Job()),
+ CLUSTER(new Cluster()),
+ RESOURCE_CPU(new Resource<>(
+ ImmutableMap.of(
+ "sla_cpu_small_", Range.closed(EMPTY.getNumCpus(), SMALL.getNumCpus()),
+ "sla_cpu_medium_", Range.openClosed(SMALL.getNumCpus(), MEDIUM.getNumCpus()),
+ "sla_cpu_large_", Range.openClosed(MEDIUM.getNumCpus(), LARGE.getNumCpus()),
+ "sla_cpu_xlarge_", Range.openClosed(LARGE.getNumCpus(), XLARGE.getNumCpus()),
+ "sla_cpu_xxlarge_", Range.greaterThan(XLARGE.getNumCpus())),
+ new Function<IScheduledTask, Double>() {
+ @Override
+ public Double apply(IScheduledTask task) {
+ return task.getAssignedTask().getTask().getNumCpus();
+ }
+ }
+ )),
+ RESOURCE_RAM(new Resource<>(
+ ImmutableMap.of(
+ "sla_ram_small_", Range.closed(EMPTY.getRamMb(), SMALL.getRamMb()),
+ "sla_ram_medium_", Range.openClosed(SMALL.getRamMb(), MEDIUM.getRamMb()),
+ "sla_ram_large_", Range.openClosed(MEDIUM.getRamMb(), LARGE.getRamMb()),
+ "sla_ram_xlarge_", Range.openClosed(LARGE.getRamMb(), XLARGE.getRamMb()),
+ "sla_ram_xxlarge_", Range.greaterThan(XLARGE.getRamMb())),
+ new Function<IScheduledTask, Long>() {
+ @Override
+ public Long apply(IScheduledTask task) {
+ return task.getAssignedTask().getTask().getRamMb();
+ }
+ }
+ )),
+ RESOURCE_DISK(new Resource<>(
+ ImmutableMap.of(
+ "sla_disk_small_", Range.closed(EMPTY.getDiskMb(), SMALL.getDiskMb()),
+ "sla_disk_medium_", Range.openClosed(SMALL.getDiskMb(), MEDIUM.getDiskMb()),
+ "sla_disk_large_", Range.openClosed(MEDIUM.getDiskMb(), LARGE.getDiskMb()),
+ "sla_disk_xlarge_", Range.openClosed(LARGE.getDiskMb(), XLARGE.getDiskMb()),
+ "sla_disk_xxlarge_", Range.greaterThan(XLARGE.getDiskMb())),
+ new Function<IScheduledTask, Long>() {
+ @Override
+ public Long apply(IScheduledTask task) {
+ return task.getAssignedTask().getTask().getDiskMb();
+ }
+ }
+ ));
+
+ private SlaGroup group;
+ GroupType(SlaGroup group) {
+ this.group = group;
+ }
+
+ SlaGroup getSlaGroup() {
+ return group;
+ }
+ }
+
+ /**
+ * Groups tasks by job.
+ */
+ class Job implements SlaGroup {
+ @Override
+ public Multimap<String, IScheduledTask> createNamedGroups(Iterable<IScheduledTask> tasks) {
+ return Multimaps.index(tasks, Functions.compose(new Function<IJobKey, String>() {
+ @Override
+ public String apply(IJobKey jobKey) {
+ return "sla_" + JobKeys.canonicalString(jobKey) + "_";
+ }
+ }, Tasks.SCHEDULED_TO_JOB_KEY));
+ }
+ }
+
+ /**
+ * Groups all tasks available in the cluster.
+ */
+ class Cluster implements SlaGroup {
+ @Override
+ public Multimap<String, IScheduledTask> createNamedGroups(Iterable<IScheduledTask> tasks) {
+ return Multimaps.index(tasks, new Function<IScheduledTask, String>() {
+ @Override
+ public String apply(IScheduledTask task) {
+ return "sla_cluster_";
+ }
+ });
+ }
+ }
+
+ /**
+ * Groups all tasks by their specified resource value.
+ *
+ * @param <T> Type of resource to group by.
+ */
+ final class Resource<T extends Number & Comparable<T>> implements SlaGroup {
+
+ private final Map<String, Range<T>> map;
+ private final Function<IScheduledTask, T> function;
+
+ private Resource(Map<String, Range<T>> map, Function<IScheduledTask, T> function) {
+ this.map = map;
+ this.function = function;
+ }
+
+ @Override
+ public Multimap<String, IScheduledTask> createNamedGroups(Iterable<IScheduledTask> tasks) {
+ ImmutableListMultimap.Builder<String, IScheduledTask> result =
+ ImmutableListMultimap.builder();
+
+ for (final Map.Entry<String, Range<T>> entry : map.entrySet()) {
+ result.putAll(entry.getKey(), Iterables.filter(tasks, new Predicate<IScheduledTask>() {
+ @Override
+ public boolean apply(IScheduledTask task) {
+ return entry.getValue().contains(function.apply(task));
+ }
+ }));
+ }
+ return result.build();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
new file mode 100644
index 0000000..0c6a2b8
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaModule.java
@@ -0,0 +1,116 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.sla;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.AbstractModule;
+import com.google.inject.BindingAnnotation;
+import com.google.inject.Singleton;
+import com.twitter.common.application.modules.LifecycleModule;
+import com.twitter.common.args.Arg;
+import com.twitter.common.args.CmdLine;
+import com.twitter.common.base.Command;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+import org.apache.aurora.scheduler.sla.MetricCalculator.MetricCalculatorSettings;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Binding module for the sla processor.
+ */
+public class SlaModule extends AbstractModule {
+
+ private static final Logger LOG = Logger.getLogger(SlaModule.class.getName());
+
+ @CmdLine(name = "sla_stat_refresh_rate", help = "The SLA stat refresh rate.")
+ private static final Arg<Amount<Long, Time>> SLA_REFRESH_RATE =
+ Arg.create(Amount.of(1L, Time.MINUTES));
+
+ @BindingAnnotation
+ @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+ private @interface SlaExecutor { }
+
+ @Override
+ protected void configure() {
+ final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
+ 1,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SlaStat-%d").build()) {
+
+ @Override
+ protected void afterExecute(Runnable runnable, @Nullable Throwable throwable) {
+ super.afterExecute(runnable, throwable);
+ if (throwable != null) {
+ LOG.log(Level.SEVERE, throwable.toString(), throwable);
+ } else if (runnable instanceof Future) {
+ try {
+ Future<?> future = (Future<?>) runnable;
+ if (future.isDone()) {
+ future.get();
+ }
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException ee) {
+ LOG.log(Level.SEVERE, ee.toString(), ee);
+ }
+ }
+ }
+ };
+
+ bind(MetricCalculatorSettings.class).toInstance(
+ new MetricCalculatorSettings(SLA_REFRESH_RATE.get().as(Time.MILLISECONDS)));
+
+ bind(MetricCalculator.class).in(Singleton.class);
+ bind(ScheduledExecutorService.class).annotatedWith(SlaExecutor.class).toInstance(executor);
+ LifecycleModule.bindStartupAction(binder(), SlaUpdater.class);
+ }
+
+ static class SlaUpdater implements Command {
+ private final ScheduledExecutorService executor;
+ private final MetricCalculator calculator;
+
+ @Inject
+ SlaUpdater(@SlaExecutor ScheduledExecutorService executor, MetricCalculator calculator) {
+ this.executor = checkNotNull(executor);
+ this.calculator = checkNotNull(calculator);
+ }
+
+ @Override
+ public void execute() throws RuntimeException {
+ long interval = SLA_REFRESH_RATE.get().as(Time.SECONDS);
+ executor.scheduleAtFixedRate(calculator, interval, interval, TimeUnit.SECONDS);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/sla/SlaUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/sla/SlaUtil.java b/src/main/java/org/apache/aurora/scheduler/sla/SlaUtil.java
new file mode 100644
index 0000000..cf09582
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/sla/SlaUtil.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.sla;
+
+import java.util.List;
+
+import com.google.common.collect.Ordering;
+
+/**
+ * Utility methods for the SLA calculations.
+ */
+final class SlaUtil {
+
+ private SlaUtil() {
+ // Utility class.
+ }
+
+ /**
+ * Reports the percentile value from the given list ordered in a non-descending order.
+ * Example: [30, 60, 70, 90], the 75 percentile is 30 (i.e. 75% of elements are greater).
+ *
+ * @param list List to calculate percentile for.
+ * @param percentile Percentile value to apply.
+ * @return Element at the given percentile.
+ */
+ static Long percentile(List<Long> list, double percentile) {
+ if (list.isEmpty()) {
+ return 0L;
+ }
+
+ List<Long> sorted = Ordering.natural().immutableSortedCopy(list);
+ int total = sorted.size();
+ int percentileElements = (int) Math.floor(percentile / 100 * total);
+ int index = total - percentileElements - 1;
+ return index >= 0 && index < total ? sorted.get(index) : 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
index b6610f7..966c498 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/SlotSizeCounter.java
@@ -26,8 +26,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
-import org.apache.aurora.gen.ResourceAggregate;
-import org.apache.aurora.scheduler.quota.ResourceAggregates;
+import org.apache.aurora.scheduler.base.ResourceAggregates;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -38,10 +37,10 @@ import static com.google.common.base.Preconditions.checkNotNull;
*/
class SlotSizeCounter implements Runnable {
private static final Map<String, IResourceAggregate> SLOT_SIZES = ImmutableMap.of(
- "small", IResourceAggregate.build(new ResourceAggregate(1.0, 1024, 4096)),
- "medium", IResourceAggregate.build(new ResourceAggregate(4.0, 8192, 16384)),
- "large", IResourceAggregate.build(new ResourceAggregate(8.0, 16384, 32768)),
- "xlarge", IResourceAggregate.build(new ResourceAggregate(16.0, 32768, 65536)));
+ "small", ResourceAggregates.SMALL,
+ "medium", ResourceAggregates.MEDIUM,
+ "large", ResourceAggregates.LARGE,
+ "xlarge", ResourceAggregates.XLARGE);
private final Map<String, IResourceAggregate> slotSizes;
private final MachineResourceProvider machineResourceProvider;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
index 14af68f..ba67454 100644
--- a/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/quota/QuotaManagerImplTest.java
@@ -25,6 +25,7 @@ import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.ResourceAggregates;
import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
import org.apache.aurora.scheduler.quota.QuotaManager.QuotaManagerImpl;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/test/java/org/apache/aurora/scheduler/sla/MetricCalculatorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/sla/MetricCalculatorTest.java b/src/test/java/org/apache/aurora/scheduler/sla/MetricCalculatorTest.java
new file mode 100644
index 0000000..c5dcc06
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/sla/MetricCalculatorTest.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.sla;
+
+import com.google.common.collect.ImmutableMap;
+import com.twitter.common.base.Supplier;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.Stat;
+import com.twitter.common.stats.StatsProvider;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.sla.MetricCalculator.MetricCalculatorSettings;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.easymock.EasyMock.expect;
+
+public class MetricCalculatorTest extends EasyMockTest {
+
+ private final FakeClock clock = new FakeClock();
+ private StorageTestUtil storageUtil;
+ private MetricCalculator calculator;
+
+ @Before
+ public void setUp() throws Exception {
+ StatsProvider statsProvider = createMock(StatsProvider.class);
+ MetricCalculatorSettings settings = new MetricCalculatorSettings(10000);
+ storageUtil = new StorageTestUtil(this);
+ calculator = new MetricCalculator(storageUtil.storage, clock, settings, statsProvider);
+ expect(statsProvider.makeGauge(EasyMock.anyString(), EasyMock.<Supplier<Number>>anyObject()))
+ .andReturn(EasyMock.<Stat<Number>>anyObject())
+ .anyTimes();
+ }
+
+ @Test
+ public void runTest() {
+ clock.advance(Amount.of(10L, Time.SECONDS));
+ storageUtil.expectTaskFetch(Query.unscoped(),
+ SlaTestUtil.makeTask(ImmutableMap.of(clock.nowMillis() - 1000, PENDING), 0),
+ SlaTestUtil.makeTask(ImmutableMap.of(clock.nowMillis() - 2000, PENDING), 1),
+ SlaTestUtil.makeTask(ImmutableMap.of(clock.nowMillis() - 3000, PENDING), 2));
+ storageUtil.expectOperations();
+
+ control.replay();
+ calculator.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/test/java/org/apache/aurora/scheduler/sla/SlaAlgorithmTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/sla/SlaAlgorithmTest.java b/src/test/java/org/apache/aurora/scheduler/sla/SlaAlgorithmTest.java
new file mode 100644
index 0000000..92244b7
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/sla/SlaAlgorithmTest.java
@@ -0,0 +1,291 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.sla;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+
+import org.apache.aurora.gen.ScheduleStatus;
+
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLING;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.SANDBOX_DELETED;
+import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.AGGREGATE_PLATFORM_UPTIME;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.JOB_UPTIME_50;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.JOB_UPTIME_75;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.JOB_UPTIME_90;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.JOB_UPTIME_95;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.JOB_UPTIME_99;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.MEDIAN_TIME_TO_ASSIGNED;
+import static org.apache.aurora.scheduler.sla.SlaAlgorithm.AlgorithmType.MEDIAN_TIME_TO_RUNNING;
+import static org.junit.Assert.assertEquals;
+
+public class SlaAlgorithmTest {
+
+ @Test
+ public void MedianTimeToAssignedEvenTest() {
+ Number actual = MEDIAN_TIME_TO_ASSIGNED.getAlgorithm().calculate(
+ ImmutableSet.of(
+ makeTask(ImmutableMap.of(50L, PENDING, 200L, ASSIGNED, 250L, KILLED)),
+ makeTask(ImmutableMap.of(100L, PENDING, 200L, ASSIGNED, 300L, RUNNING)),
+ makeTask(ImmutableMap.of(200L, PENDING, 250L, ASSIGNED, 350L, STARTING))),
+ Range.<Long>all());
+ assertEquals(50L, actual);
+ }
+
+ @Test
+ public void MedianTimeToAssignedOddTest() {
+ Number actual = MEDIAN_TIME_TO_ASSIGNED.getAlgorithm().calculate(
+ ImmutableSet.of(
+ makeTask(ImmutableMap.of(50L, PENDING, 200L, ASSIGNED, 250L, RUNNING)),
+ makeTask(ImmutableMap.of(100L, PENDING, 200L, ASSIGNED, 300L, RUNNING)),
+ makeTask(ImmutableMap.of(200L, PENDING, 250L, ASSIGNED, 350L, STARTING))),
+ Range.<Long>all());
+ assertEquals(100L, actual);
+ }
+
+ @Test
+ public void MedianTimeToAssignedZeroTest() {
+ Number actual = MEDIAN_TIME_TO_ASSIGNED.getAlgorithm().calculate(
+ ImmutableSet.of(
+ makeTask(ImmutableMap.of(50L, PENDING)),
+ makeTask(ImmutableMap.of(100L, PENDING, 200L, ASSIGNED, 300L, KILLED))),
+ Range.<Long>all());
+ assertEquals(0L, actual);
+ }
+
+ @Test
+ public void MedianTimeToAssignedOneTest() {
+ Number actual = MEDIAN_TIME_TO_ASSIGNED.getAlgorithm().calculate(
+ ImmutableSet.of(
+ makeTask(ImmutableMap.of(50L, PENDING)),
+ makeTask(ImmutableMap.of(100L, PENDING, 200L, ASSIGNED))),
+ Range.<Long>all());
+ assertEquals(100L, actual);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void MedianTimeToAssignedNoPendingTest() {
+ MEDIAN_TIME_TO_ASSIGNED.getAlgorithm().calculate(
+ ImmutableSet.of(
+ makeTask(ImmutableMap.of(50L, ASSIGNED))),
+ Range.<Long>all());
+ }
+
+ @Test
+ public void MedianTimeToRunningEvenTest() {
+ Number actual = MEDIAN_TIME_TO_RUNNING.getAlgorithm().calculate(
+ ImmutableSet.of(
+ makeTask(ImmutableMap.of(50L, PENDING)), // Ignored as not RUNNING
+ makeTask(ImmutableMap.of(50L, PENDING, 100L, ASSIGNED, 150L, STARTING, 180L, RUNNING)),
+ makeTask(ImmutableMap.of(100L, PENDING, 200L, ASSIGNED, 300L, STARTING, 400L, RUNNING)),
+ makeTask(ImmutableMap.of(
+ 50L, PENDING,
+ 100L, ASSIGNED,
+ 150L, STARTING,
+ 200L, RUNNING,
+ 300L, KILLED))), // Ignored due to being terminal.
+ Range.<Long>all());
+ assertEquals(130L, actual);
+ }
+
+ @Test
+ public void MedianTimeToRunningOddTest() {
+ Number actual = MEDIAN_TIME_TO_RUNNING.getAlgorithm().calculate(
+ ImmutableSet.of(
+ makeTask(ImmutableMap.of(50L, PENDING)), // Ignored as not RUNNING
+ makeTask(ImmutableMap.of(50L, PENDING, 100L, ASSIGNED, 150L, STARTING, 180L, RUNNING)),
+ makeTask(ImmutableMap.of(100L, PENDING, 200L, ASSIGNED, 300L, STARTING, 400L, RUNNING)),
+ makeTask(ImmutableMap.of(50L, PENDING, 100L, ASSIGNED, 150L, STARTING, 200L, RUNNING))),
+ Range.<Long>all());
+ assertEquals(150L, actual);
+ }
+
+ @Test
+ public void MedianTimeToRunningZeroTest() {
+ Number actual = MEDIAN_TIME_TO_RUNNING.getAlgorithm().calculate(
+ ImmutableSet.of(
+ makeTask(ImmutableMap.of(50L, PENDING)),
+ makeTask(ImmutableMap.of(50L, PENDING, 100L, RUNNING, 200L, KILLED))),
+ Range.<Long>all());
+ assertEquals(0L, actual);
+ }
+
+ @Test
+ public void JobUptime50Test() {
+ long now = System.currentTimeMillis();
+ Number actual = JOB_UPTIME_50.getAlgorithm().calculate(
+ makeUptimeTasks(100, now),
+ Range.closed(0L, now));
+ assertEquals(50, actual);
+ }
+
+ @Test
+ public void JobUptime75Test() {
+ long now = System.currentTimeMillis();
+ Number actual = JOB_UPTIME_75.getAlgorithm().calculate(
+ makeUptimeTasks(100, now),
+ Range.closed(0L, now));
+ assertEquals(25, actual);
+ }
+
+ @Test
+ public void JobUptime90Test() {
+ long now = System.currentTimeMillis();
+ Number actual = JOB_UPTIME_90.getAlgorithm().calculate(
+ makeUptimeTasks(100, now),
+ Range.closed(0L, now));
+ assertEquals(10, actual);
+ }
+
+ @Test
+ public void JobUptime95Test() {
+ long now = System.currentTimeMillis();
+ Number actual = JOB_UPTIME_95.getAlgorithm().calculate(
+ makeUptimeTasks(100, now),
+ Range.closed(0L, now));
+ assertEquals(5, actual);
+ }
+
+ @Test
+ public void JobUptime99Test() {
+ long now = System.currentTimeMillis();
+ Number actual = JOB_UPTIME_99.getAlgorithm().calculate(
+ makeUptimeTasks(100, now),
+ Range.closed(0L, now));
+ assertEquals(1, actual);
+ }
+
+ @Test
+ public void JobUptimeEmptyTest() {
+ long now = System.currentTimeMillis();
+ Number actual = JOB_UPTIME_99.getAlgorithm().calculate(
+ new LinkedList<IScheduledTask>(),
+ Range.closed(0L, now));
+ assertEquals(0, actual);
+ }
+
+ @Test
+ public void JobUptimeNonTerminalIgnoredTest() {
+ long now = System.currentTimeMillis();
+ Set<IScheduledTask> instances = makeUptimeTasks(100, now);
+ instances.add(makeTask(ImmutableMap.of(now - 5000, RUNNING, now - 3000, KILLED)));
+ Number actual = JOB_UPTIME_99.getAlgorithm().calculate(instances, Range.closed(0L, now));
+ assertEquals(1, actual);
+ }
+
+ @Test
+ public void JobUptimeLiveNonTerminalIgnoredTest() {
+ long now = System.currentTimeMillis();
+ Set<IScheduledTask> instances = makeUptimeTasks(100, now);
+ instances.add(makeTask(ImmutableMap.of(now - 5000, RUNNING, now - 3000, RESTARTING)));
+ Number actual = JOB_UPTIME_99.getAlgorithm().calculate(instances, Range.closed(0L, now));
+ assertEquals(1, actual);
+ }
+
+ @Test
+ public void AggregatePlatformUptimeTest() {
+ Number actual = AGGREGATE_PLATFORM_UPTIME.getAlgorithm().calculate(
+ ImmutableSet.of(
+ makeTask(ImmutableMap.of(100L, PENDING), 0), // Ignored.
+ makeTask(ImmutableMap.of(
+ 100L, PENDING,
+ 200L, ASSIGNED,
+ 300L, STARTING,
+ 400L, RUNNING), 1), // 100% uptime.
+ makeTask(ImmutableMap.<Long, ScheduleStatus>builder()
+ .put(5L, INIT)
+ .put(10L, PENDING)
+ .put(20L, ASSIGNED)
+ .put(30L, STARTING)
+ .put(50L, RUNNING)
+ .put(400L, KILLING)
+ .put(450L, KILLED).build(), 2)), // 100% uptime.
+ Range.closedOpen(100L, 500L));
+ assertEquals(100.0, actual);
+ }
+
+ @Test
+ public void AggregatePlatformUptimeRecoveredFromDownTest() {
+ Number actual = AGGREGATE_PLATFORM_UPTIME.getAlgorithm().calculate(
+ ImmutableSet.of(
+ makeTask(ImmutableMap.of(50L, RUNNING, 300L, LOST, 310L, KILLED), 0), // DOWN mid range.
+ makeTask(ImmutableMap.of(
+ 320L, PENDING,
+ 330L, ASSIGNED,
+ 350L, STARTING,
+ 400L, RUNNING), 0)), // Recovered within range.
+ Range.closedOpen(100L, 500L));
+ assertEquals(75.0, actual);
+ }
+
+ @Test
+ public void AggregatePlatformUptimeKilledByPlatformTest() {
+ Number actual = AGGREGATE_PLATFORM_UPTIME.getAlgorithm().calculate(
+ ImmutableSet.of(makeTask(ImmutableMap.of(50L, RUNNING, 300L, KILLED), 0)),
+ Range.closedOpen(100L, 500L));
+ assertEquals(50.0, actual);
+ }
+
+ @Test
+ public void AggregatePlatformUptimeSandboxDeletedIgnoredTest() {
+ Number actual = AGGREGATE_PLATFORM_UPTIME.getAlgorithm().calculate(
+ ImmutableSet.of(
+ makeTask(ImmutableMap.of(50L, RUNNING, 300L, LOST, 400L, SANDBOX_DELETED), 0)),
+ Range.closedOpen(100L, 500L));
+ assertEquals(50.0, actual);
+ }
+
+ @Test
+ public void AggregatePlatformUptimeEmptyTest() {
+ Number actual = AGGREGATE_PLATFORM_UPTIME.getAlgorithm().calculate(
+ ImmutableSet.of(makeTask(ImmutableMap.of(50L, PENDING), 0)),
+ Range.closedOpen(100L, 500L));
+ assertEquals(100.0, actual);
+ }
+
+ private static Set<IScheduledTask> makeUptimeTasks(int num, long now) {
+ Set<IScheduledTask> instances = Sets.newHashSet();
+ for (int i = 0; i < num; i++) {
+ instances.add(makeTask(ImmutableMap.of(now - (i + 1) * 1000, RUNNING)));
+ }
+ return instances;
+ }
+
+ private static IScheduledTask makeTask(Map<Long, ScheduleStatus> events) {
+ return makeTask(events, 0);
+ }
+
+ private static IScheduledTask makeTask(Map<Long, ScheduleStatus> events, int instanceId) {
+ return SlaTestUtil.makeTask(events, instanceId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/test/java/org/apache/aurora/scheduler/sla/SlaTestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/sla/SlaTestUtil.java b/src/test/java/org/apache/aurora/scheduler/sla/SlaTestUtil.java
new file mode 100644
index 0000000..b6d4774
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/sla/SlaTestUtil.java
@@ -0,0 +1,65 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.sla;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskEvent;
+
+public final class SlaTestUtil {
+
+ private SlaTestUtil() {
+ // Utility class.
+ }
+
+ public static IScheduledTask makeTask(
+ Map<Long, ScheduleStatus> events, int instanceId) {
+ List<ITaskEvent> taskEvents = makeEvents(events);
+ return IScheduledTask.build(new ScheduledTask()
+ .setStatus(Iterables.getLast(taskEvents).getStatus())
+ .setTaskEvents(ITaskEvent.toBuildersList(taskEvents))
+ .setAssignedTask(new AssignedTask()
+ .setTaskId("task_Id")
+ .setSlaveHost("host")
+ .setInstanceId(instanceId)
+ .setTask(new TaskConfig()
+ .setJobName("job")
+ .setIsService(true)
+ .setProduction(true)
+ .setEnvironment("env")
+ .setOwner(new Identity("role", "role-user")))));
+ }
+
+ public static List<ITaskEvent> makeEvents(Map<Long, ScheduleStatus> events) {
+ ImmutableList.Builder<ITaskEvent> taskEvents = ImmutableList.builder();
+ for (Map.Entry<Long, ScheduleStatus> entry : events.entrySet()) {
+ taskEvents.add(ITaskEvent.build(new TaskEvent(entry.getKey(), entry.getValue())));
+ }
+
+ return taskEvents.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java b/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java
index 415b48c..c732f57 100644
--- a/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/stats/SlotSizeCounterTest.java
@@ -24,7 +24,7 @@ import com.twitter.common.stats.StatsProvider;
import com.twitter.common.testing.easymock.EasyMockTest;
import org.apache.aurora.gen.ResourceAggregate;
-import org.apache.aurora.scheduler.quota.ResourceAggregates;
+import org.apache.aurora.scheduler.base.ResourceAggregates;
import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResource;
import org.apache.aurora.scheduler.stats.SlotSizeCounter.MachineResourceProvider;
import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/53dc494f/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
index 4a13dbc..b985fbb 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplTest.java
@@ -36,7 +36,7 @@ import org.apache.aurora.gen.storage.Snapshot;
import org.apache.aurora.gen.storage.StoredJob;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.quota.ResourceAggregates;
+import org.apache.aurora.scheduler.base.ResourceAggregates;
import org.apache.aurora.scheduler.storage.SnapshotStore;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.ILock;