You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/01/29 20:11:50 UTC
git commit: Work in progress, still updating tests.
Updated Branches:
refs/heads/wfarner/AURORA-139 [created] ab5395b3e
Work in progress, still updating tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/ab5395b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/ab5395b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/ab5395b3
Branch: refs/heads/wfarner/AURORA-139
Commit: ab5395b3e345f075cad39689325d2f514154934a
Parents: 3870df3
Author: Bill Farner <wf...@apache.org>
Authored: Wed Jan 29 11:11:34 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Jan 29 11:11:34 2014 -0800
----------------------------------------------------------------------
.../aurora/scheduler/async/TaskScheduler.java | 21 ++-
.../scheduler/filter/AttributeFilter.java | 9 +-
.../aurora/scheduler/filter/CachedJobState.java | 47 +++++
.../scheduler/filter/ConstraintFilter.java | 19 +-
.../scheduler/filter/SchedulingFilter.java | 8 +-
.../scheduler/filter/SchedulingFilterImpl.java | 38 ++--
.../aurora/scheduler/state/TaskAssigner.java | 13 +-
.../filter/SchedulingFilterImplTest.java | 180 ++++++++++---------
8 files changed, 196 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ab5395b3/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index 4afc332..75843ad 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -17,6 +17,7 @@ package org.apache.aurora.scheduler.async;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
+import java.util.EnumSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
@@ -31,6 +32,7 @@ import com.google.common.base.Ticker;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
import com.google.common.eventbus.Subscribe;
import com.google.inject.BindingAnnotation;
import com.twitter.common.inject.TimedInterceptor.Timed;
@@ -40,14 +42,18 @@ import com.twitter.common.stats.StatImpl;
import com.twitter.common.stats.Stats;
import com.twitter.common.util.Clock;
+import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.filter.CachedJobState;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.state.TaskAssigner;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.TaskStore;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.SlaveID;
@@ -126,24 +132,32 @@ interface TaskScheduler extends EventSubscriber {
this.reservations = new Reservations(reservationDuration, clock);
}
+ private static final Iterable<ScheduleStatus> ACTIVE_NOT_PENDING_STATES =
+ EnumSet.copyOf(Sets.difference(Tasks.ACTIVE_STATES, EnumSet.of(ScheduleStatus.PENDING)));
+
private Function<Offer, Optional<TaskInfo>> getAssignerFunction(
+ TaskStore taskStore,
final String taskId,
final IScheduledTask task) {
+ final CachedJobState cachedJobState = new CachedJobState(taskStore.fetchTasks(
+ Query.jobScoped(Tasks.SCHEDULED_TO_JOB_KEY.apply(task))
+ .byStatus(ACTIVE_NOT_PENDING_STATES)));
+
return new Function<Offer, Optional<TaskInfo>>() {
@Override public Optional<TaskInfo> apply(Offer offer) {
Optional<String> reservedTaskId = reservations.getSlaveReservation(offer.getSlaveId());
if (reservedTaskId.isPresent()) {
if (taskId.equals(reservedTaskId.get())) {
// Slave is reserved to satisfy this task.
- return assigner.maybeAssign(offer, task);
+ return assigner.maybeAssign(offer, task, cachedJobState);
} else {
// Slave is reserved for another task.
return Optional.absent();
}
} else {
// Slave is not reserved.
- return assigner.maybeAssign(offer, task);
+ return assigner.maybeAssign(offer, task, cachedJobState);
}
}
};
@@ -168,7 +182,8 @@ interface TaskScheduler extends EventSubscriber {
LOG.warning("Failed to look up task " + taskId + ", it may have been deleted.");
} else {
try {
- if (!offerQueue.launchFirst(getAssignerFunction(taskId, task))) {
+ if (!offerQueue.launchFirst(
+ getAssignerFunction(store.getTaskStore(), taskId, task))) {
// Task could not be scheduled.
maybePreemptFor(taskId);
return TaskSchedulerResult.TRY_AGAIN;
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ab5395b3/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java
index 0816d3f..5fe59c8 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/AttributeFilter.java
@@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.filter.SchedulingFilterImpl.AttributeLoader;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -64,21 +63,16 @@ final class AttributeFilter {
* Tests whether an attribute matches a limit constraint.
*
* @param attributes Attributes to match against.
- * @param jobKey Key of the job with the limited constraint.
* @param limit Limit value.
* @param activeTasks All active tasks in the system.
* @param attributeFetcher Interface for fetching attributes for hosts in the system.
* @return {@code true} if the limit constraint is satisfied, {@code false} otherwise.
*/
static boolean matches(final Set<Attribute> attributes,
- final IJobKey jobKey,
int limit,
Iterable<IScheduledTask> activeTasks,
final AttributeLoader attributeFetcher) {
- Predicate<IScheduledTask> sameJob =
- Predicates.compose(Predicates.equalTo(jobKey), Tasks.SCHEDULED_TO_JOB_KEY);
-
Predicate<IScheduledTask> hasAttribute = new Predicate<IScheduledTask>() {
@Override public boolean apply(IScheduledTask task) {
Iterable<Attribute> hostAttributes =
@@ -87,7 +81,6 @@ final class AttributeFilter {
}
};
- return limit > Iterables.size(
- Iterables.filter(activeTasks, Predicates.and(sameJob, hasAttribute)));
+ return limit > Iterables.size(Iterables.filter(activeTasks, hasAttribute));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ab5395b3/src/main/java/org/apache/aurora/scheduler/filter/CachedJobState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/CachedJobState.java b/src/main/java/org/apache/aurora/scheduler/filter/CachedJobState.java
new file mode 100644
index 0000000..7d764b9
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/filter/CachedJobState.java
@@ -0,0 +1,47 @@
+/**
+ * Copyright 2014 Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.filter;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+
+/**
+ * TODO(wfarner): Take this caching one step further and don't store tasks here, but instead store
+ * pre-computed aggregates of host attributes. For example, translate each IScheduledTask into
+ * the HostAttributes of the machine it resides on, and count the number of times each attribute is
+ * seen. This could be represented in a <pre>Map<String, Multiset<String>></pre>, where
+ * the outer key is the attribute name, and the inner key is attribute values. So, for a job with
+ * two tasks on the same rack but different hosts, you could have this aggregate:
+ * <pre>
+ * { "host": {"hostA": 1, "hostB": 1},
+ * "rack": {"rack1": 2}
+ * }
+ * </pre>
+ */
+public class CachedJobState {
+
+ private final ImmutableSet<IScheduledTask> activeTasks;
+
+ public CachedJobState(ImmutableSet<IScheduledTask> activeTasks) {
+ this.activeTasks = Preconditions.checkNotNull(activeTasks);
+ }
+
+ public ImmutableSet<IScheduledTask> getActiveTasks() {
+ return activeTasks;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ab5395b3/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
index 17fc8b9..8c2313c 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/ConstraintFilter.java
@@ -15,7 +15,6 @@
*/
package org.apache.aurora.scheduler.filter;
-import java.util.Collection;
import java.util.Set;
import java.util.logging.Logger;
@@ -23,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@@ -32,8 +30,6 @@ import org.apache.aurora.scheduler.base.SchedulerException;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.filter.SchedulingFilterImpl.AttributeLoader;
import org.apache.aurora.scheduler.storage.entities.IConstraint;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConstraint;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -45,27 +41,23 @@ class ConstraintFilter implements Function<IConstraint, Optional<Veto>> {
private static final Logger LOG = Logger.getLogger(ConstraintFilter.class.getName());
- private final IJobKey jobKey;
- private final Supplier<Collection<IScheduledTask>> activeTasksSupplier;
+ private final CachedJobState cachedjobState;
private final AttributeLoader attributeLoader;
private final Iterable<Attribute> hostAttributes;
/**
* Creates a new constraint filer for a given job.
*
- * @param jobKey Key for the job.
- * @param activeTasksSupplier Supplier to fetch active tasks (if necessary).
+ * @param cachedjobState Cached information about the job containing the task being matched.
* @param attributeLoader Interface to fetch host attributes (if necessary).
* @param hostAttributes The attributes of the host to test against.
*/
ConstraintFilter(
- IJobKey jobKey,
- Supplier<Collection<IScheduledTask>> activeTasksSupplier,
+ CachedJobState cachedjobState,
AttributeLoader attributeLoader,
Iterable<Attribute> hostAttributes) {
- this.jobKey = checkNotNull(jobKey);
- this.activeTasksSupplier = checkNotNull(activeTasksSupplier);
+ this.cachedjobState = checkNotNull(cachedjobState);
this.attributeLoader = checkNotNull(attributeLoader);
this.hostAttributes = checkNotNull(hostAttributes);
}
@@ -106,9 +98,8 @@ class ConstraintFilter implements Function<IConstraint, Optional<Veto>> {
boolean satisfied = AttributeFilter.matches(
attributes,
- jobKey,
taskConstraint.getLimit().getLimit(),
- activeTasksSupplier.get(),
+ cachedjobState.getActiveTasks(),
attributeLoader);
return satisfied
? Optional.<Veto>absent()
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ab5395b3/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index 999e0f7..f244c35 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -109,8 +109,14 @@ public interface SchedulingFilter {
* @param slaveHost Host that the resources are associated with.
* @param task Task.
* @param taskId Canonical ID of the task.
+ * @param cachedjobState Cached information about the job containing {@code task}.
* @return A set of vetoes indicating reasons the task cannot be scheduled. If the task may be
* scheduled, the set will be empty.
*/
- Set<Veto> filter(ResourceSlot offer, String slaveHost, ITaskConfig task, String taskId);
+ Set<Veto> filter(
+ ResourceSlot offer,
+ String slaveHost,
+ ITaskConfig task,
+ String taskId,
+ CachedJobState cachedjobState);
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ab5395b3/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
index 4ba1483..784dde6 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
@@ -15,7 +15,6 @@
*/
package org.apache.aurora.scheduler.filter;
-import java.util.Collection;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.Set;
@@ -25,23 +24,17 @@ import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
-import com.google.common.collect.Sets;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Data;
import org.apache.aurora.gen.Attribute;
import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.TaskConstraint;
import org.apache.aurora.scheduler.ResourceSlot;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.state.MaintenanceController;
import org.apache.aurora.scheduler.storage.AttributeStore;
@@ -49,7 +42,6 @@ import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.Storage.Work.Quiet;
import org.apache.aurora.scheduler.storage.entities.IConstraint;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -190,7 +182,8 @@ public class SchedulingFilterImpl implements SchedulingFilter {
private static final Ordering<IConstraint> VALUES_FIRST = Ordering.from(
new Comparator<IConstraint>() {
- @Override public int compare(IConstraint a, IConstraint b) {
+ @Override
+ public int compare(IConstraint a, IConstraint b) {
if (a.getConstraint().getSetField() == b.getConstraint().getSetField()) {
return 0;
}
@@ -198,10 +191,7 @@ public class SchedulingFilterImpl implements SchedulingFilter {
}
});
- private static final Iterable<ScheduleStatus> ACTIVE_NOT_PENDING_STATES =
- EnumSet.copyOf(Sets.difference(Tasks.ACTIVE_STATES, EnumSet.of(ScheduleStatus.PENDING)));
-
- private FilterRule getConstraintFilter(final String slaveHost) {
+ private FilterRule getConstraintFilter(final CachedJobState jobState, final String slaveHost) {
return new FilterRule() {
@Override public Iterable<Veto> apply(final ITaskConfig task) {
if (!task.isSetConstraints()) {
@@ -220,18 +210,8 @@ public class SchedulingFilterImpl implements SchedulingFilter {
}
};
- Supplier<Collection<IScheduledTask>> activeTasksSupplier =
- Suppliers.memoize(new Supplier<Collection<IScheduledTask>>() {
- @Override public Collection<IScheduledTask> get() {
- return storeProvider.getTaskStore().fetchTasks(
- Query.jobScoped(Tasks.INFO_TO_JOB_KEY.apply(task))
- .byStatus(ACTIVE_NOT_PENDING_STATES));
- }
- });
-
ConstraintFilter constraintFilter = new ConstraintFilter(
- Tasks.INFO_TO_JOB_KEY.apply(task),
- activeTasksSupplier,
+ jobState,
attributeLoader,
attributeLoader.apply(slaveHost));
ImmutableList.Builder<Veto> vetoes = ImmutableList.builder();
@@ -281,12 +261,18 @@ public class SchedulingFilterImpl implements SchedulingFilter {
}
@Override
- public Set<Veto> filter(ResourceSlot offer, String slaveHost, ITaskConfig task, String taskId) {
+ public Set<Veto> filter(
+ ResourceSlot offer,
+ String slaveHost,
+ ITaskConfig task,
+ String taskId,
+ CachedJobState cachedJobState) {
+
if (!ConfigurationManager.isDedicated(task) && isDedicated(slaveHost)) {
return ImmutableSet.of(DEDICATED_HOST_VETO);
}
return ImmutableSet.<Veto>builder()
- .addAll(getConstraintFilter(slaveHost).apply(task))
+ .addAll(getConstraintFilter(cachedJobState, slaveHost).apply(task))
.addAll(getResourceVetoes(offer, task))
.addAll(getMaintenanceVeto(slaveHost).asSet())
.build();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ab5395b3/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index d8f326e..3154ef0 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -26,6 +26,7 @@ import org.apache.aurora.scheduler.MesosTaskFactory;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.configuration.Resources;
+import org.apache.aurora.scheduler.filter.CachedJobState;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
@@ -46,9 +47,10 @@ public interface TaskAssigner {
*
* @param offer The resource offer.
* @param task The task to match against and optionally assign.
+ * @param cachedJobState Cached information about the job containing {@code task}.
* @return Instructions for launching the task if matching and assignment were successful.
*/
- Optional<TaskInfo> maybeAssign(Offer offer, IScheduledTask task);
+ Optional<TaskInfo> maybeAssign(Offer offer, IScheduledTask task, CachedJobState cachedJobState);
class TaskAssignerImpl implements TaskAssigner {
private static final Logger LOG = Logger.getLogger(TaskAssignerImpl.class.getName());
@@ -83,12 +85,17 @@ public interface TaskAssigner {
}
@Override
- public Optional<TaskInfo> maybeAssign(Offer offer, IScheduledTask task) {
+ public Optional<TaskInfo> maybeAssign(
+ Offer offer,
+ IScheduledTask task,
+ CachedJobState cachedJobState) {
+
Set<Veto> vetoes = filter.filter(
ResourceSlot.from(offer),
offer.getHostname(),
task.getAssignedTask().getTask(),
- Tasks.id(task));
+ Tasks.id(task),
+ cachedJobState);
if (vetoes.isEmpty()) {
return Optional.of(assign(offer, task));
} else {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/ab5395b3/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index f3df73c..b097098 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -39,7 +39,6 @@ import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskConstraint;
import org.apache.aurora.gen.ValueConstraint;
import org.apache.aurora.scheduler.ResourceSlot;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.state.MaintenanceController;
@@ -47,7 +46,6 @@ import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.Storage.Work.Quiet;
-import org.apache.aurora.scheduler.storage.TaskStore;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.easymock.EasyMock;
@@ -64,7 +62,6 @@ import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVe
import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.DISK;
import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.PORTS;
import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.RAM;
-import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -103,13 +100,15 @@ public class SchedulingFilterImplTest extends EasyMockTest {
Amount.of(DEFAULT_DISK, Data.MB),
0);
+ private static final CachedJobState EMPTY_JOB =
+ new CachedJobState(ImmutableSet.<IScheduledTask>of());
+
private final AtomicLong taskIdCounter = new AtomicLong();
private SchedulingFilter defaultFilter;
private MaintenanceController maintenance;
private Storage storage;
private StoreProvider storeProvider;
- private TaskStore.Mutable taskStore;
private AttributeStore.Mutable attributeStore;
@Before
@@ -118,13 +117,11 @@ public class SchedulingFilterImplTest extends EasyMockTest {
maintenance = createMock(MaintenanceController.class);
defaultFilter = new SchedulingFilterImpl(storage, maintenance);
storeProvider = createMock(StoreProvider.class);
- taskStore = createMock(TaskStore.Mutable.class);
attributeStore = createMock(AttributeStore.Mutable.class);
// Link the store provider to the store mocks.
expectReads();
- expect(storeProvider.getTaskStore()).andReturn(taskStore).anyTimes();
expect(storeProvider.getAttributeStore()).andReturn(attributeStore).anyTimes();
}
@@ -142,19 +139,17 @@ public class SchedulingFilterImplTest extends EasyMockTest {
public void testMeetsOffer() throws Exception {
expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
expectGetHostMaintenanceStatus(HOST_A).times(2);
- expectGetTasks().times(2);
control.replay();
- assertNoVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK));
- assertNoVetoes(makeTask(DEFAULT_CPUS - 1, DEFAULT_RAM - 1, DEFAULT_DISK - 1));
+ assertNoVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK), EMPTY_JOB);
+ assertNoVetoes(makeTask(DEFAULT_CPUS - 1, DEFAULT_RAM - 1, DEFAULT_DISK - 1), EMPTY_JOB);
}
@Test
public void testSufficientPorts() throws Exception {
expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
expectGetHostMaintenanceStatus(HOST_A).times(4);
- expectGetTasks().times(4);
control.replay();
@@ -178,28 +173,28 @@ public class SchedulingFilterImplTest extends EasyMockTest {
.setRequestedPorts(ImmutableSet.of("one", "two", "three")));
Set<Veto> none = ImmutableSet.of();
- assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, noPortTask, TASK_ID));
- assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, onePortTask, TASK_ID));
- assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, twoPortTask, TASK_ID));
+ assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, noPortTask, TASK_ID, EMPTY_JOB));
+ assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, onePortTask, TASK_ID, EMPTY_JOB));
+ assertEquals(none, defaultFilter.filter(twoPorts, HOST_A, twoPortTask, TASK_ID, EMPTY_JOB));
assertEquals(
ImmutableSet.of(PORTS.veto(1)),
- defaultFilter.filter(twoPorts, HOST_A, threePortTask, TASK_ID));
+ defaultFilter.filter(twoPorts, HOST_A, threePortTask, TASK_ID, EMPTY_JOB));
}
@Test
public void testInsufficientResources() throws Exception {
expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
expectGetHostMaintenanceStatus(HOST_A).times(4);
- expectGetTasks().times(4);
control.replay();
assertVetoes(
makeTask(DEFAULT_CPUS + 1, DEFAULT_RAM + 1, DEFAULT_DISK + 1),
+ EMPTY_JOB,
CPU.veto(1), DISK.veto(1), RAM.veto(1));
- assertVetoes(makeTask(DEFAULT_CPUS + 1, DEFAULT_RAM, DEFAULT_DISK), CPU.veto(1));
- assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM + 1, DEFAULT_DISK), RAM.veto(1));
- assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK + 1), DISK.veto(1));
+ assertVetoes(makeTask(DEFAULT_CPUS + 1, DEFAULT_RAM, DEFAULT_DISK), EMPTY_JOB, CPU.veto(1));
+ assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM + 1, DEFAULT_DISK), EMPTY_JOB, RAM.veto(1));
+ assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK + 1), EMPTY_JOB, DISK.veto(1));
}
@Test
@@ -209,13 +204,13 @@ public class SchedulingFilterImplTest extends EasyMockTest {
control.replay();
- checkConstraint(HOST_A, DEDICATED_ATTRIBUTE, true, ROLE_A);
- assertVetoes(makeTask(OWNER_B, JOB_B), HOST_A, DEDICATED_HOST_VETO);
+ checkConstraint(EMPTY_JOB, HOST_A, DEDICATED_ATTRIBUTE, true, ROLE_A);
+ assertVetoes(makeTask(OWNER_B, JOB_B), HOST_A, EMPTY_JOB, DEDICATED_HOST_VETO);
}
@Test
public void testSharedDedicatedHost() throws Exception {
- String dedicated1 = "ads/adserver";
+ String dedicated1 = "userA/jobA";
String dedicated2 = "kestrel/kestrel";
expectGetHostAttributes(HOST_A, dedicated(dedicated1, dedicated2)).anyTimes();
@@ -224,19 +219,23 @@ public class SchedulingFilterImplTest extends EasyMockTest {
control.replay();
assertNoVetoes(checkConstraint(
- new Identity().setRole("ads"),
- "adserver",
+ new Identity().setRole("userA"),
+ "jobA",
+ EMPTY_JOB,
HOST_A,
DEDICATED_ATTRIBUTE,
true,
- dedicated1));
+ dedicated1),
+ EMPTY_JOB);
assertNoVetoes(checkConstraint(
new Identity().setRole("kestrel"),
"kestrel",
+ EMPTY_JOB,
HOST_A,
DEDICATED_ATTRIBUTE,
true,
- dedicated2));
+ dedicated2),
+ EMPTY_JOB);
}
@Test
@@ -248,44 +247,41 @@ public class SchedulingFilterImplTest extends EasyMockTest {
control.replay();
- checkConstraint(HOST_A, "jvm", true, "1.0");
- checkConstraint(HOST_A, "jvm", false, "4.0");
+ checkConstraint(EMPTY_JOB, HOST_A, "jvm", true, "1.0");
+ checkConstraint(EMPTY_JOB, HOST_A, "jvm", false, "4.0");
- checkConstraint(HOST_A, "jvm", true, "1.0", "2.0");
- checkConstraint(HOST_B, "jvm", false, "2.0", "3.0");
+ checkConstraint(EMPTY_JOB, HOST_A, "jvm", true, "1.0", "2.0");
+ checkConstraint(EMPTY_JOB, HOST_B, "jvm", false, "2.0", "3.0");
}
@Test
public void testHostScheduledForMaintenance() throws Exception {
expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
- expectGetTasks();
expectGetHostMaintenanceStatus(HOST_A, MaintenanceMode.SCHEDULED);
control.replay();
- assertNoVetoes(makeTask(), HOST_A);
+ assertNoVetoes(makeTask(), HOST_A, EMPTY_JOB);
}
@Test
public void testHostDrainingForMaintenance() throws Exception {
expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
- expectGetTasks();
expectGetHostMaintenanceStatus(HOST_A, MaintenanceMode.DRAINING);
control.replay();
- assertVetoes(makeTask(), ConstraintFilter.maintenanceVeto("draining"));
+ assertVetoes(makeTask(), EMPTY_JOB, ConstraintFilter.maintenanceVeto("draining"));
}
@Test
public void testHostDrainedForMaintenance() throws Exception {
expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
- expectGetTasks();
expectGetHostMaintenanceStatus(HOST_A, MaintenanceMode.DRAINED);
control.replay();
- assertVetoes(makeTask(), ConstraintFilter.maintenanceVeto("drained"));
+ assertVetoes(makeTask(), EMPTY_JOB, ConstraintFilter.maintenanceVeto("drained"));
}
@Test
@@ -300,9 +296,12 @@ public class SchedulingFilterImplTest extends EasyMockTest {
Constraint constraint1 = makeConstraint("host", HOST_A);
Constraint constraint2 = makeConstraint(DEDICATED_ATTRIBUTE, "xxx");
- assertVetoes(makeTask(OWNER_A, JOB_A, constraint1, constraint2), HOST_A,
+ assertVetoes(
+ makeTask(OWNER_A, JOB_A, constraint1, constraint2),
+ HOST_A,
+ EMPTY_JOB,
mismatchVeto(DEDICATED_ATTRIBUTE));
- assertNoVetoes(makeTask(OWNER_B, JOB_B, constraint1, constraint2), HOST_B);
+ assertNoVetoes(makeTask(OWNER_B, JOB_B, constraint1, constraint2), HOST_B, EMPTY_JOB);
}
@Test
@@ -321,10 +320,12 @@ public class SchedulingFilterImplTest extends EasyMockTest {
assertVetoes(
makeTask(OWNER_A, JOB_A, hostLimit, makeConstraint(DEDICATED_ATTRIBUTE, "xxx")),
HOST_A,
+ EMPTY_JOB,
mismatchVeto(DEDICATED_ATTRIBUTE));
assertVetoes(
makeTask(OWNER_B, JOB_A, hostLimit, makeConstraint(DEDICATED_ATTRIBUTE, "xxx")),
HOST_B,
+ EMPTY_JOB,
mismatchVeto(DEDICATED_ATTRIBUTE));
}
@@ -332,12 +333,11 @@ public class SchedulingFilterImplTest extends EasyMockTest {
public void testUnderLimitNoTasks() throws Exception {
expectGetHostAttributes(HOST_A);
expectGetHostAttributes(HOST_A, host(HOST_A));
- expectGetTasks();
expectGetHostMaintenanceStatus(HOST_A);
control.replay();
- assertNoVetoes(hostLimitTask(2), HOST_A);
+ assertNoVetoes(hostLimitTask(2), HOST_A, EMPTY_JOB);
}
private Attribute host(String host) {
@@ -361,31 +361,30 @@ public class SchedulingFilterImplTest extends EasyMockTest {
expectGetHostMaintenanceStatus(HOST_B).atLeastOnce();
expectGetHostMaintenanceStatus(HOST_C).atLeastOnce();
- expectGetTasks(
+ CachedJobState jobState = new CachedJobState(ImmutableSet.of(
makeScheduledTask(OWNER_A, JOB_A, HOST_A),
makeScheduledTask(OWNER_B, JOB_A, HOST_A),
makeScheduledTask(OWNER_B, JOB_A, HOST_A),
makeScheduledTask(OWNER_A, JOB_A, HOST_B),
makeScheduledTask(OWNER_A, JOB_A, HOST_B),
makeScheduledTask(OWNER_B, JOB_A, HOST_B),
- makeScheduledTask(OWNER_A, JOB_A, HOST_C))
- .atLeastOnce();
+ makeScheduledTask(OWNER_A, JOB_A, HOST_C)));
control.replay();
- assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 2), HOST_A);
- assertVetoes(hostLimitTask(OWNER_A, JOB_A, 1), HOST_B, limitVeto(HOST_ATTRIBUTE));
- assertVetoes(hostLimitTask(OWNER_A, JOB_A, 2), HOST_B, limitVeto(HOST_ATTRIBUTE));
- assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 3), HOST_B);
+ assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 2), HOST_A, jobState);
+ assertVetoes(hostLimitTask(OWNER_A, JOB_A, 1), HOST_B, jobState, limitVeto(HOST_ATTRIBUTE));
+ assertVetoes(hostLimitTask(OWNER_A, JOB_A, 2), HOST_B, jobState, limitVeto(HOST_ATTRIBUTE));
+ assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 3), HOST_B, jobState);
- assertVetoes(rackLimitTask(OWNER_B, JOB_A, 2), HOST_B, limitVeto(RACK_ATTRIBUTE));
- assertVetoes(rackLimitTask(OWNER_B, JOB_A, 3), HOST_B, limitVeto(RACK_ATTRIBUTE));
- assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 4), HOST_B);
+ assertVetoes(rackLimitTask(OWNER_B, JOB_A, 2), HOST_B, jobState, limitVeto(RACK_ATTRIBUTE));
+ assertVetoes(rackLimitTask(OWNER_B, JOB_A, 3), HOST_B, jobState, limitVeto(RACK_ATTRIBUTE));
+ assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 4), HOST_B, jobState);
- assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 1), HOST_C);
+ assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 1), HOST_C, jobState);
- assertVetoes(rackLimitTask(OWNER_A, JOB_A, 1), HOST_C, limitVeto(RACK_ATTRIBUTE));
- assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 2), HOST_C);
+ assertVetoes(rackLimitTask(OWNER_A, JOB_A, 1), HOST_C, jobState, limitVeto(RACK_ATTRIBUTE));
+ assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 2), HOST_C, jobState);
}
@Test
@@ -396,19 +395,19 @@ public class SchedulingFilterImplTest extends EasyMockTest {
control.replay();
// Matches attribute, matching value.
- checkConstraint(HOST_A, "jvm", true, "1.0");
+ checkConstraint(EMPTY_JOB, HOST_A, "jvm", true, "1.0");
// Matches attribute, different value.
- checkConstraint(HOST_A, "jvm", false, "1.4");
+ checkConstraint(EMPTY_JOB, HOST_A, "jvm", false, "1.4");
// Does not match attribute.
- checkConstraint(HOST_A, "xxx", false, "1.4");
+ checkConstraint(EMPTY_JOB, HOST_A, "xxx", false, "1.4");
// Logical 'OR' matching attribute.
- checkConstraint(HOST_A, "jvm", false, "1.2", "1.4");
+ checkConstraint(EMPTY_JOB, HOST_A, "jvm", false, "1.2", "1.4");
// Logical 'OR' not matching attribute.
- checkConstraint(HOST_A, "xxx", false, "1.0", "1.4");
+ checkConstraint(EMPTY_JOB, HOST_A, "xxx", false, "1.0", "1.4");
}
@Test
@@ -421,32 +420,35 @@ public class SchedulingFilterImplTest extends EasyMockTest {
control.replay();
// Matches attribute, matching value.
- checkConstraint(HOST_A, "jvm", true, "1.4");
+ checkConstraint(EMPTY_JOB, HOST_A, "jvm", true, "1.4");
// Matches attribute, different value.
- checkConstraint(HOST_A, "jvm", false, "1.0");
+ checkConstraint(EMPTY_JOB, HOST_A, "jvm", false, "1.0");
// Does not match attribute.
- checkConstraint(HOST_A, "xxx", false, "1.4");
+ checkConstraint(EMPTY_JOB, HOST_A, "xxx", false, "1.4");
// Logical 'OR' with attribute and value match.
- checkConstraint(HOST_A, "jvm", true, "1.2", "1.4");
+ checkConstraint(EMPTY_JOB, HOST_A, "jvm", true, "1.2", "1.4");
// Does not match attribute.
- checkConstraint(HOST_A, "xxx", false, "1.0", "1.4");
+ checkConstraint(EMPTY_JOB, HOST_A, "xxx", false, "1.0", "1.4");
// Check that logical AND works.
Constraint jvmConstraint = makeConstraint("jvm", "1.6");
Constraint zoneConstraint = makeConstraint("zone", "c");
ITaskConfig task = makeTask(OWNER_A, JOB_A, jvmConstraint, zoneConstraint);
- assertTrue(defaultFilter.filter(DEFAULT_OFFER, HOST_A, task, TASK_ID).isEmpty());
+ assertTrue(defaultFilter.filter(DEFAULT_OFFER, HOST_A, task, TASK_ID, EMPTY_JOB).isEmpty());
Constraint jvmNegated = jvmConstraint.deepCopy();
jvmNegated.getConstraint().getValue().setNegated(true);
Constraint zoneNegated = jvmConstraint.deepCopy();
zoneNegated.getConstraint().getValue().setNegated(true);
- assertVetoes(makeTask(OWNER_A, JOB_A, jvmNegated, zoneNegated), HOST_A,
+ assertVetoes(
+ makeTask(OWNER_A, JOB_A, jvmNegated, zoneNegated),
+ HOST_A,
+ EMPTY_JOB,
mismatchVeto("jvm"));
}
@@ -461,25 +463,35 @@ public class SchedulingFilterImplTest extends EasyMockTest {
}
private ITaskConfig checkConstraint(
+ CachedJobState cachedJobState,
String host,
String constraintName,
boolean expected,
String value,
String... vs) {
- return checkConstraint(OWNER_A, JOB_A, host, constraintName, expected, value, vs);
+ return checkConstraint(
+ OWNER_A,
+ JOB_A,
+ cachedJobState,
+ host,
+ constraintName,
+ expected,
+ value,
+ vs);
}
private ITaskConfig checkConstraint(
Identity owner,
String jobName,
+ CachedJobState cachedJobState,
String host,
String constraintName,
boolean expected,
String value,
String... vs) {
- return checkConstraint(owner, jobName, host, constraintName, expected,
+ return checkConstraint(owner, jobName, cachedJobState, host, constraintName, expected,
new ValueConstraint(false,
ImmutableSet.<String>builder().add(value).addAll(Arrays.asList(vs)).build()));
}
@@ -487,6 +499,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
private ITaskConfig checkConstraint(
Identity owner,
String jobName,
+ CachedJobState cachedJobState,
String host,
String constraintName,
boolean expected,
@@ -496,32 +509,38 @@ public class SchedulingFilterImplTest extends EasyMockTest {
ITaskConfig task = makeTask(owner, jobName, constraint);
assertEquals(
expected,
- defaultFilter.filter(DEFAULT_OFFER, host, task, TASK_ID).isEmpty());
+ defaultFilter.filter(DEFAULT_OFFER, host, task, TASK_ID, cachedJobState).isEmpty());
Constraint negated = constraint.deepCopy();
negated.getConstraint().getValue().setNegated(!value.isNegated());
ITaskConfig negatedTask = makeTask(owner, jobName, negated);
assertEquals(
!expected,
- defaultFilter.filter(DEFAULT_OFFER, host, negatedTask, TASK_ID).isEmpty());
+ defaultFilter.filter(DEFAULT_OFFER, host, negatedTask, TASK_ID, cachedJobState).isEmpty());
return task;
}
- private void assertNoVetoes(ITaskConfig task) {
- assertNoVetoes(task, HOST_A);
+ private void assertNoVetoes(ITaskConfig task, CachedJobState jobState) {
+ assertNoVetoes(task, HOST_A, jobState);
}
- private void assertNoVetoes(ITaskConfig task, String host) {
- assertVetoes(task, host);
+ private void assertNoVetoes(ITaskConfig task, String host, CachedJobState jobState) {
+ assertVetoes(task, host, jobState);
}
- private void assertVetoes(ITaskConfig task, Veto... vetos) {
- assertVetoes(task, HOST_A, vetos);
+ private void assertVetoes(ITaskConfig task, CachedJobState jobState, Veto... vetos) {
+ assertVetoes(task, HOST_A, jobState, vetos);
}
- private void assertVetoes(ITaskConfig task, String host, Veto... vetoes) {
- assertEquals(ImmutableSet.copyOf(vetoes),
- defaultFilter.filter(DEFAULT_OFFER, host, task, TASK_ID));
+ private void assertVetoes(
+ ITaskConfig task,
+ String host,
+ CachedJobState jobState,
+ Veto... vetoes) {
+
+ assertEquals(
+ ImmutableSet.copyOf(vetoes),
+ defaultFilter.filter(DEFAULT_OFFER, host, task, TASK_ID, jobState));
}
private Attribute valueAttribute(String name, String string, String... strings) {
@@ -534,13 +553,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
TaskConstraint.value(new ValueConstraint(false, ImmutableSet.copyOf(values))));
}
- private IExpectationSetters<ImmutableSet<IScheduledTask>> expectGetTasks(
- IScheduledTask... tasks) {
-
- return expect(taskStore.fetchTasks((Query.Builder) anyObject()))
- .andReturn(ImmutableSet.copyOf(tasks));
- }
-
private IExpectationSetters<MaintenanceMode> expectGetHostMaintenanceStatus(String host) {
return expectGetHostMaintenanceStatus(host, MaintenanceMode.NONE);
}