You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2015/04/14 18:54:09 UTC
aurora git commit: Simplify AttributeAggregate.
Repository: aurora
Updated Branches:
refs/heads/master 9aab87f18 -> 3b29a4b79
Simplify AttributeAggregate.
Reviewed at https://reviews.apache.org/r/33106/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/3b29a4b7
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/3b29a4b7
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/3b29a4b7
Branch: refs/heads/master
Commit: 3b29a4b797e137f16b4fda76cd42073a0e5b3ad5
Parents: 9aab87f
Author: Bill Farner <wf...@apache.org>
Authored: Tue Apr 14 09:53:22 2015 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Tue Apr 14 09:53:22 2015 -0700
----------------------------------------------------------------------
.../aurora/benchmark/SchedulingBenchmarks.java | 18 +--
.../scheduler/filter/AttributeAggregate.java | 117 ++++++++++---------
.../scheduler/async/TaskSchedulerImplTest.java | 16 +--
.../scheduler/async/TaskSchedulerTest.java | 31 +++--
.../preemptor/PendingTaskProcessorTest.java | 8 +-
.../async/preemptor/PreemptorImplTest.java | 20 +---
.../async/preemptor/PreemptorModuleTest.java | 10 +-
.../preemptor/PreemptorSlotFinderTest.java | 13 +--
.../events/NotifyingSchedulingFilterTest.java | 18 +--
.../filter/AttributeAggregateTest.java | 92 +++++++--------
.../filter/SchedulingFilterImplTest.java | 108 +++++++----------
.../scheduler/state/TaskAssignerImplTest.java | 16 +--
12 files changed, 187 insertions(+), 280 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index ce87344..0113505 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -21,8 +21,6 @@ import java.util.concurrent.TimeUnit;
import javax.inject.Singleton;
import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.eventbus.EventBus;
@@ -50,7 +48,6 @@ import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl.Reserva
import org.apache.aurora.scheduler.async.preemptor.ClusterStateImpl;
import org.apache.aurora.scheduler.async.preemptor.Preemptor;
import org.apache.aurora.scheduler.async.preemptor.PreemptorModule;
-import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
@@ -309,7 +306,6 @@ public class SchedulingBenchmarks {
* Tests preemptor searching for a preemption slot in a completely filled up cluster.
*/
public static class PreemptorSlotSearchBenchmark extends AbstractBase {
-
@Override
protected BenchmarkSettings getSettings() {
return new BenchmarkSettings.Builder()
@@ -327,20 +323,8 @@ public class SchedulingBenchmarks {
@Override
public Boolean apply(final Storage.MutableStoreProvider storeProvider) {
IAssignedTask assignedTask = getSettings().getTask().getAssignedTask();
- final Query.Builder query = Query.jobScoped(assignedTask.getTask().getJob())
- .byStatus(org.apache.aurora.scheduler.base.Tasks.SLAVE_ASSIGNED_STATES);
-
- Supplier<ImmutableSet<IScheduledTask>> taskSupplier = Suppliers.memoize(
- new Supplier<ImmutableSet<IScheduledTask>>() {
- @Override
- public ImmutableSet<IScheduledTask> get() {
- return storeProvider.getTaskStore().fetchTasks(query);
- }
- });
-
AttributeAggregate aggregate =
- new AttributeAggregate(taskSupplier, storeProvider.getAttributeStore());
-
+ AttributeAggregate.getJobActiveState(storeProvider, assignedTask.getTask().getJob());
Optional<String> result =
preemptor.attemptPreemptionFor(assignedTask, aggregate, storeProvider);
http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
index ed82ae9..bd74f89 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
@@ -13,17 +13,13 @@
*/
package org.apache.aurora.scheduler.filter;
-import java.util.Map;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
-import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.AtomicLongMap;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMultiset;
+import com.google.common.collect.Multiset;
import com.twitter.common.collections.Pair;
import org.apache.aurora.scheduler.base.Query;
@@ -41,18 +37,20 @@ import static java.util.Objects.requireNonNull;
* once the job state may change (e.g. after exiting a write transaction). This is intended to
* capture job state once and avoid redundant queries.
* <p>
- * Note that while the state injected into this class is used lazily (to allow for queries to happen
- * only on-demand), calling {@link #equals(Object)} and {@link #hashCode()} rely on the aggregation
- * result, thus invoking the {@link Supplier} and {@link AttributeStore}.
+ * TODO(wfarner): Consider preserving this as only a helper class to compute the Multiset
+ * representing the aggregate, since this class is now a thin wrapper over a Multiset.
*/
-public class AttributeAggregate {
+public final class AttributeAggregate {
/**
- * A lazily-computed mapping from attribute name and value to the count of tasks with that
- * name/value combination. See doc for {@link #getNumTasksWithAttribute(String, String)} for
- * further details.
+ * A mapping from attribute name and value to the count of tasks with that name/value combination.
+ * See doc for {@link #getNumTasksWithAttribute(String, String)} for further details.
*/
- private final Supplier<Map<Pair<String, String>, Long>> aggregate;
+ private final Supplier<Multiset<Pair<String, String>>> aggregate;
+
+ private AttributeAggregate(Supplier<Multiset<Pair<String, String>>> aggregate) {
+ this.aggregate = Suppliers.memoize(aggregate);
+ }
/**
* Initializes an {@link AttributeAggregate} instance from data store.
@@ -65,58 +63,72 @@ public class AttributeAggregate {
final StoreProvider storeProvider,
final IJobKey jobKey) {
- Supplier<ImmutableSet<IScheduledTask>> taskSupplier = Suppliers.memoize(
- new Supplier<ImmutableSet<IScheduledTask>>() {
+ return create(
+ new Supplier<Iterable<IScheduledTask>>() {
@Override
- public ImmutableSet<IScheduledTask> get() {
- return storeProvider.getTaskStore().fetchTasks(
- Query.jobScoped(jobKey).byStatus(Tasks.SLAVE_ASSIGNED_STATES));
+ public Iterable<IScheduledTask> get() {
+ return storeProvider.getTaskStore()
+ .fetchTasks(Query.jobScoped(jobKey).byStatus(Tasks.SLAVE_ASSIGNED_STATES));
}
- });
- return new AttributeAggregate(taskSupplier, storeProvider.getAttributeStore());
+ },
+ storeProvider.getAttributeStore());
}
- /**
- * Creates a new attribute aggregate, which will be computed from the provided external state.
- *
- * @param activeTaskSupplier Supplier of active tasks within the aggregated scope.
- * @param attributeStore Source of host attributes to associate with tasks.
- */
- public AttributeAggregate(
- final Supplier<ImmutableSet<IScheduledTask>> activeTaskSupplier,
+ @VisibleForTesting
+ static AttributeAggregate create(
+ final Supplier<Iterable<IScheduledTask>> taskSupplier,
final AttributeStore attributeStore) {
- requireNonNull(activeTaskSupplier);
- requireNonNull(attributeStore);
-
- final Function<IScheduledTask, Iterable<IAttribute>> getHostAttributes =
- new Function<IScheduledTask, Iterable<IAttribute>>() {
+ final Function<String, Iterable<IAttribute>> getHostAttributes =
+ new Function<String, Iterable<IAttribute>>() {
@Override
- public Iterable<IAttribute> apply(IScheduledTask task) {
+ public Iterable<IAttribute> apply(String host) {
// Note: this assumes we have access to attributes for hosts where all active tasks
// reside.
- String host = requireNonNull(task.getAssignedTask().getSlaveHost());
+ requireNonNull(host);
return attributeStore.getHostAttributes(host).get().getAttributes();
}
};
- aggregate = Suppliers.memoize(new Supplier<Map<Pair<String, String>, Long>>() {
- @Override
- public Map<Pair<String, String>, Long> get() {
- AtomicLongMap<Pair<String, String>> counts = AtomicLongMap.create();
- Iterable<IAttribute> allAttributes =
- Iterables.concat(Iterables.transform(activeTaskSupplier.get(), getHostAttributes));
- for (IAttribute attribute : allAttributes) {
- for (String value : attribute.getValues()) {
- counts.incrementAndGet(Pair.of(attribute.getName(), value));
+ return create(Suppliers.compose(
+ new Function<Iterable<IScheduledTask>, Iterable<IAttribute>>() {
+ @Override
+ public Iterable<IAttribute> apply(Iterable<IScheduledTask> tasks) {
+ return FluentIterable.from(tasks)
+ .transform(Tasks.SCHEDULED_TO_SLAVE_HOST)
+ .transformAndConcat(getHostAttributes);
+ }
+ },
+ taskSupplier));
+ }
+
+ @VisibleForTesting
+ static AttributeAggregate create(Supplier<Iterable<IAttribute>> attributes) {
+ Supplier<Multiset<Pair<String, String>>> aggregator = Suppliers.compose(
+ new Function<Iterable<IAttribute>, Multiset<Pair<String, String>>>() {
+ @Override
+ public Multiset<Pair<String, String>> apply(Iterable<IAttribute> attributes) {
+ ImmutableMultiset.Builder<Pair<String, String>> builder = ImmutableMultiset.builder();
+ for (IAttribute attribute : attributes) {
+ for (String value : attribute.getValues()) {
+ builder.add(Pair.of(attribute.getName(), value));
+ }
+ }
+
+ return builder.build();
}
- }
+ },
+ attributes
+ );
- return ImmutableMap.copyOf(counts.asMap());
- }
- });
+ return new AttributeAggregate(aggregator);
}
+ @VisibleForTesting
+ public static final AttributeAggregate EMPTY =
+ new AttributeAggregate(Suppliers.<Multiset<Pair<String, String>>>ofInstance(
+ ImmutableMultiset.<Pair<String, String>>of()));
+
/**
* Gets the total number of tasks with a given attribute name and value combination.
* <p>
@@ -135,12 +147,11 @@ public class AttributeAggregate {
* @return Number of tasks in the job whose hosts have the provided attribute name and value.
*/
public long getNumTasksWithAttribute(String name, String value) {
- return Optional.fromNullable(aggregate.get().get(Pair.of(name, value)))
- .or(0L);
+ return aggregate.get().count(Pair.of(name, value));
}
@VisibleForTesting
- Map<Pair<String, String>, Long> getAggregates() {
+ Multiset<Pair<String, String>> getAggregates() {
return aggregate.get();
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index c5643d9..b61abf9 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.async;
import com.google.common.base.Function;
import com.google.common.base.Optional;
-import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableSet;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
@@ -43,14 +42,12 @@ import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.events.PubsubEventModule;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.state.PubsubTestUtil;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.state.TaskAssigner;
import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
import org.apache.aurora.scheduler.state.TaskAssigner.Assignment.Result;
-import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
@@ -66,6 +63,7 @@ import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
@@ -96,7 +94,6 @@ public class TaskSchedulerImplTest extends EasyMockTest {
private Amount<Long, Time> reservationDuration;
private Amount<Long, Time> halfReservationDuration;
private EventSink eventSink;
- private AttributeAggregate emptyJob;
@Before
public void setUp() throws Exception {
@@ -113,9 +110,6 @@ public class TaskSchedulerImplTest extends EasyMockTest {
Injector injector = getInjector(storageUtil.storage);
scheduler = injector.getInstance(TaskScheduler.class);
eventSink = PubsubTestUtil.startPubsub(injector);
- emptyJob = new AttributeAggregate(
- Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
- createMock(AttributeStore.class));
}
private Injector getInjector(final Storage storageImpl) {
@@ -146,7 +140,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
expect(assigner.maybeAssign(
storageUtil.mutableStoreProvider,
OFFER,
- new ResourceRequest(task.getAssignedTask().getTask(), Tasks.id(task), emptyJob)))
+ new ResourceRequest(task.getAssignedTask().getTask(), Tasks.id(task), EMPTY)))
.andReturn(Assignment.success(TaskInfo.getDefaultInstance()));
}
@@ -207,7 +201,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
expect(assigner.maybeAssign(
storageUtil.mutableStoreProvider,
OFFER,
- new ResourceRequest(TASK_B.getAssignedTask().getTask(), Tasks.id(TASK_B), emptyJob)))
+ new ResourceRequest(TASK_B.getAssignedTask().getTask(), Tasks.id(TASK_B), EMPTY)))
.andReturn(Assignment.success(TaskInfo.getDefaultInstance()));
control.replay();
@@ -317,7 +311,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
expect(assigner.maybeAssign(
EasyMock.<MutableStoreProvider>anyObject(),
eq(OFFER),
- eq(new ResourceRequest(taskA.getAssignedTask().getTask(), Tasks.id(taskA), emptyJob))))
+ eq(new ResourceRequest(taskA.getAssignedTask().getTask(), Tasks.id(taskA), EMPTY))))
.andReturn(Assignment.success(TaskInfo.getDefaultInstance()));
control.replay();
@@ -346,7 +340,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
private void expectPreemptorCall(IScheduledTask task, Optional<String> result) {
expect(preemptor.attemptPreemptionFor(
task.getAssignedTask(),
- emptyJob,
+ EMPTY,
storageUtil.mutableStoreProvider)).andReturn(result);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 88c0163..9c47a76 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.RateLimiter;
@@ -59,7 +58,6 @@ import org.apache.aurora.scheduler.state.MaintenanceController;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.state.TaskAssigner;
import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
-import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
@@ -89,6 +87,7 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLED;
import static org.apache.aurora.gen.ScheduleStatus.LOST;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
import static org.apache.mesos.Protos.Offer;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.eq;
@@ -125,7 +124,6 @@ public class TaskSchedulerTest extends EasyMockTest {
private StatsProvider statsProvider;
private RescheduleCalculator rescheduleCalculator;
private Preemptor preemptor;
- private AttributeAggregate emptyJob;
private Amount<Long, Time> reservationDuration = Amount.of(1L, Time.MINUTES);
@Before
@@ -144,9 +142,6 @@ public class TaskSchedulerTest extends EasyMockTest {
statsProvider = createMock(StatsProvider.class);
rescheduleCalculator = createMock(RescheduleCalculator.class);
preemptor = createMock(Preemptor.class);
- emptyJob = new AttributeAggregate(
- Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
- createMock(AttributeStore.class));
}
private void replayAndCreateScheduler() {
@@ -315,11 +310,11 @@ public class TaskSchedulerTest extends EasyMockTest {
TaskInfo mesosTask = makeTaskInfo(task);
Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
- expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.failure());
+ expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure());
expectPreemptorCall(task);
Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
- expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.success(mesosTask));
+ expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask));
driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
Capture<Runnable> timeoutCapture3 = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
@@ -350,7 +345,7 @@ public class TaskSchedulerTest extends EasyMockTest {
Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
expectAnyMaintenanceCalls();
expectOfferDeclineIn(10);
- expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.success(mesosTask));
+ expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask));
driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
expectLastCall().andThrow(new IllegalStateException("Driver not ready."));
expect(stateManager.changeState(
@@ -380,10 +375,10 @@ public class TaskSchedulerTest extends EasyMockTest {
Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
expectAnyMaintenanceCalls();
expectOfferDeclineIn(10);
- expectMaybeAssign(OFFER_A, task, emptyJob).andThrow(new StorageException("Injected failure."));
+ expectMaybeAssign(OFFER_A, task, EMPTY).andThrow(new StorageException("Injected failure."));
Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
- expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.success(mesosTask));
+ expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.success(mesosTask));
driver.launchTask(OFFER_A.getOffer().getId(), mesosTask);
expectLastCall();
@@ -402,7 +397,7 @@ public class TaskSchedulerTest extends EasyMockTest {
Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
expectAnyMaintenanceCalls();
- expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.failure());
+ expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure());
Capture<Runnable> timeoutCapture2 = expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 10);
expectPreemptorCall(task);
driver.declineOffer(OFFER_A.getOffer().getId());
@@ -464,13 +459,13 @@ public class TaskSchedulerTest extends EasyMockTest {
IScheduledTask taskA = makeTask("A", PENDING);
TaskInfo mesosTaskA = makeTaskInfo(taskA);
- expectMaybeAssign(OFFER_A, taskA, emptyJob).andReturn(Assignment.success(mesosTaskA));
+ expectMaybeAssign(OFFER_A, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA));
driver.launchTask(OFFER_A.getOffer().getId(), mesosTaskA);
Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
IScheduledTask taskB = makeTask("B", PENDING);
TaskInfo mesosTaskB = makeTaskInfo(taskB);
- expectMaybeAssign(OFFER_B, taskB, emptyJob).andReturn(Assignment.success(mesosTaskB));
+ expectMaybeAssign(OFFER_B, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB));
driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskB);
Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
@@ -496,7 +491,7 @@ public class TaskSchedulerTest extends EasyMockTest {
IScheduledTask taskA = makeTask("A", PENDING);
TaskInfo mesosTaskA = makeTaskInfo(taskA);
- expectMaybeAssign(OFFER_B, taskA, emptyJob).andReturn(Assignment.success(mesosTaskA));
+ expectMaybeAssign(OFFER_B, taskA, EMPTY).andReturn(Assignment.success(mesosTaskA));
driver.launchTask(OFFER_B.getOffer().getId(), mesosTaskA);
Capture<Runnable> captureA = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
@@ -505,7 +500,7 @@ public class TaskSchedulerTest extends EasyMockTest {
HostOffer updatedOfferC = new HostOffer(
OFFER_C.getOffer(),
IHostAttributes.build(OFFER_C.getAttributes().newBuilder().setMode(NONE)));
- expectMaybeAssign(updatedOfferC, taskB, emptyJob).andReturn(Assignment.success(mesosTaskB));
+ expectMaybeAssign(updatedOfferC, taskB, EMPTY).andReturn(Assignment.success(mesosTaskB));
driver.launchTask(OFFER_C.getOffer().getId(), mesosTaskB);
Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
@@ -602,7 +597,7 @@ public class TaskSchedulerTest extends EasyMockTest {
final IScheduledTask task = makeTask("a", PENDING);
Capture<Runnable> timeoutCapture = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
- expectMaybeAssign(OFFER_A, task, emptyJob).andReturn(Assignment.failure());
+ expectMaybeAssign(OFFER_A, task, EMPTY).andReturn(Assignment.failure());
expectTaskGroupBackoff(FIRST_SCHEDULE_DELAY_MS, 20);
expectPreemptorCall(task);
@@ -655,7 +650,7 @@ public class TaskSchedulerTest extends EasyMockTest {
private void expectPreemptorCall(IScheduledTask task) {
expect(preemptor.attemptPreemptionFor(
eq(task.getAssignedTask()),
- eq(emptyJob),
+ eq(EMPTY),
EasyMock.<MutableStoreProvider>anyObject())).andReturn(Optional.<String>absent());
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
index bcd1b4e..75fc16d 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessorTest.java
@@ -16,7 +16,6 @@ package org.apache.aurora.scheduler.async.preemptor;
import java.util.Arrays;
import com.google.common.base.Optional;
-import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableSet;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
@@ -33,7 +32,6 @@ import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.stats.CachedCounters;
-import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -68,7 +66,6 @@ public class PendingTaskProcessorTest extends EasyMockTest {
private FakeStatsProvider statsProvider;
private PreemptionSlotFinder preemptionSlotFinder;
private PendingTaskProcessor slotFinder;
- private AttributeAggregate attrAggregate;
private PreemptionSlotCache slotCache;
private FakeClock clock;
@@ -80,9 +77,6 @@ public class PendingTaskProcessorTest extends EasyMockTest {
slotCache = createMock(PreemptionSlotCache.class);
statsProvider = new FakeStatsProvider();
clock = new FakeClock();
- attrAggregate = new AttributeAggregate(
- Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
- createMock(AttributeStore.class));
slotFinder = new PendingTaskProcessor(
storageUtil.storage,
@@ -135,7 +129,7 @@ public class PendingTaskProcessorTest extends EasyMockTest {
private void expectSlotSearch(ScheduledTask task, Optional<PreemptionSlot> slot) {
expect(preemptionSlotFinder.findPreemptionSlotFor(
IAssignedTask.build(task.getAssignedTask()),
- attrAggregate,
+ AttributeAggregate.EMPTY,
storageUtil.storeProvider)).andReturn(slot);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
index 281f4e0..97d6087 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorImplTest.java
@@ -14,9 +14,7 @@
package org.apache.aurora.scheduler.async.preemptor;
import com.google.common.base.Optional;
-import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableSet;
-
import com.twitter.common.testing.easymock.EasyMockTest;
import org.apache.aurora.gen.AssignedTask;
@@ -28,10 +26,8 @@ import org.apache.aurora.gen.TaskEvent;
import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
import org.apache.aurora.scheduler.async.preemptor.Preemptor.PreemptorImpl;
import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.stats.CachedCounters;
-import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -43,6 +39,7 @@ import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.slotValidationStatName;
import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.successStatName;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
@@ -61,7 +58,6 @@ public class PreemptorImplTest extends EasyMockTest {
private FakeStatsProvider statsProvider;
private PreemptionSlotFinder preemptionSlotFinder;
private PreemptorImpl preemptor;
- private AttributeAggregate attrAggregate;
private PreemptionSlotCache slotCache;
private Storage.MutableStoreProvider storeProvider;
@@ -72,10 +68,6 @@ public class PreemptorImplTest extends EasyMockTest {
preemptionSlotFinder = createMock(PreemptionSlotFinder.class);
slotCache = createMock(PreemptionSlotCache.class);
statsProvider = new FakeStatsProvider();
- attrAggregate = new AttributeAggregate(
- Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
- createMock(AttributeStore.class));
-
preemptor = new PreemptorImpl(
stateManager,
preemptionSlotFinder,
@@ -124,15 +116,15 @@ public class PreemptorImplTest extends EasyMockTest {
}
private Optional<String> callPreemptor() {
- return preemptor.attemptPreemptionFor(TASK.getAssignedTask(), attrAggregate, storeProvider);
+ return preemptor.attemptPreemptionFor(TASK.getAssignedTask(), EMPTY, storeProvider);
}
private void expectSlotValidation(Optional<ImmutableSet<PreemptionVictim>> victims) {
expect(preemptionSlotFinder.validatePreemptionSlotFor(
- eq(TASK.getAssignedTask()),
- eq(attrAggregate),
- eq(SLOT),
- anyObject(Storage.MutableStoreProvider.class))).andReturn(victims);
+ TASK.getAssignedTask(),
+ EMPTY,
+ SLOT,
+ storeProvider)).andReturn(victims);
}
private void expectPreempted(IScheduledTask preempted) throws Exception {
http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
index 7e2d1c5..9d3820a 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModuleTest.java
@@ -14,8 +14,6 @@
package org.apache.aurora.scheduler.async.preemptor;
import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableSet;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
@@ -33,10 +31,8 @@ import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.state.TaskAssigner;
-import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.junit.Before;
import org.junit.Test;
@@ -80,10 +76,6 @@ public class PreemptorModuleTest extends EasyMockTest {
Amount.of(0L, Time.SECONDS),
Amount.of(0L, Time.SECONDS)));
- Supplier<ImmutableSet<IScheduledTask>> taskSupplier =
- createMock(new EasyMockTest.Clazz<Supplier<ImmutableSet<IScheduledTask>>>() { });
- AttributeStore attributeStore = createMock(AttributeStore.class);
-
control.replay();
injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute();
@@ -93,7 +85,7 @@ public class PreemptorModuleTest extends EasyMockTest {
Optional.<String>absent(),
injector.getInstance(Preemptor.class).attemptPreemptionFor(
IAssignedTask.build(new AssignedTask()),
- new AttributeAggregate(taskSupplier, attributeStore),
+ AttributeAggregate.EMPTY,
storageUtil.mutableStoreProvider));
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java
index b80e558..eed2de9 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/preemptor/PreemptorSlotFinderTest.java
@@ -20,7 +20,6 @@ import java.util.Set;
import com.google.common.base.Function;
import com.google.common.base.Optional;
-import com.google.common.base.Suppliers;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
@@ -48,16 +47,13 @@ import org.apache.aurora.scheduler.async.OfferManager;
import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlot;
import org.apache.aurora.scheduler.async.preemptor.PreemptionSlotFinder.PreemptionSlotFinderImpl;
import org.apache.aurora.scheduler.configuration.Resources;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
import org.apache.aurora.scheduler.mesos.TaskExecutors;
import org.apache.aurora.scheduler.stats.CachedCounters;
-import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.apache.mesos.Protos;
@@ -70,6 +66,7 @@ import org.junit.Test;
import static org.apache.aurora.gen.MaintenanceMode.NONE;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.scheduler.async.preemptor.PreemptorMetrics.MISSING_ATTRIBUTES_NAME;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
import static org.apache.mesos.Protos.Offer;
import static org.apache.mesos.Protos.Resource;
import static org.easymock.EasyMock.expect;
@@ -98,7 +95,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
private FakeStatsProvider statsProvider;
private ClusterState clusterState;
private OfferManager offerManager;
- private AttributeAggregate emptyJob;
private PreemptorMetrics preemptorMetrics;
@Before
@@ -109,9 +105,6 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
storageUtil.expectOperations();
statsProvider = new FakeStatsProvider();
preemptorMetrics = new PreemptorMetrics(new CachedCounters(statsProvider));
- emptyJob = new AttributeAggregate(
- Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
- createMock(AttributeStore.class));
}
private Optional<PreemptionSlot> runSlotFinder(ScheduledTask pendingTask) {
@@ -124,7 +117,7 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
return slotFinder.findPreemptionSlotFor(
IAssignedTask.build(pendingTask.getAssignedTask()),
- emptyJob,
+ EMPTY,
storageUtil.mutableStoreProvider);
}
@@ -417,7 +410,7 @@ public class PreemptorSlotFinderTest extends EasyMockTest {
Optional<ImmutableSet<PreemptionVictim>> victims = slotFinder.validatePreemptionSlotFor(
IAssignedTask.build(p1.getAssignedTask()),
- emptyJob,
+ EMPTY,
slot.get(),
storageUtil.mutableStoreProvider);
http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
index 61cea32..2b71043 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.events;
import java.util.Set;
-import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableSet;
import com.twitter.common.testing.easymock.EasyMockTest;
@@ -31,9 +30,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.mesos.TaskExecutors;
-import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.junit.Before;
import org.junit.Test;
@@ -51,7 +48,8 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
private static final UnusedResource RESOURCE = new UnusedResource(
ResourceSlot.from(TASK, TaskExecutors.NO_OVERHEAD_EXECUTOR),
IHostAttributes.build(new HostAttributes().setHost("host").setMode(MaintenanceMode.NONE)));
- private ResourceRequest request;
+ private static final ResourceRequest REQUEST =
+ new ResourceRequest(TASK, "taskId", AttributeAggregate.EMPTY);
private static final Veto VETO_1 = Veto.insufficientResources("ram", 1);
private static final Veto VETO_2 = Veto.insufficientResources("ram", 2);
@@ -65,30 +63,26 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
delegate = createMock(SchedulingFilter.class);
eventSink = createMock(EventSink.class);
filter = new NotifyingSchedulingFilter(delegate, eventSink);
- AttributeAggregate emptyJob = new AttributeAggregate(
- Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
- createMock(AttributeStore.class));
- request = new ResourceRequest(TASK, "taskId", emptyJob);
}
@Test
public void testEvents() {
Set<Veto> vetoes = ImmutableSet.of(VETO_1, VETO_2);
- expect(delegate.filter(RESOURCE, request)).andReturn(vetoes);
+ expect(delegate.filter(RESOURCE, REQUEST)).andReturn(vetoes);
eventSink.post(new Vetoed(GROUP_KEY, vetoes));
control.replay();
- assertEquals(vetoes, filter.filter(RESOURCE, request));
+ assertEquals(vetoes, filter.filter(RESOURCE, REQUEST));
}
@Test
public void testNoVetoes() {
Set<Veto> vetoes = ImmutableSet.of();
- expect(delegate.filter(RESOURCE, request)).andReturn(vetoes);
+ expect(delegate.filter(RESOURCE, REQUEST)).andReturn(vetoes);
control.replay();
- assertEquals(vetoes, filter.filter(RESOURCE, request));
+ assertEquals(vetoes, filter.filter(RESOURCE, REQUEST));
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java b/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java
index 4b56576..6b36062 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/AttributeAggregateTest.java
@@ -13,12 +13,11 @@
*/
package org.apache.aurora.scheduler.filter;
-import java.util.Map;
-
import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableMultiset;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multiset;
import com.twitter.common.collections.Pair;
import com.twitter.common.testing.easymock.EasyMockTest;
@@ -37,65 +36,51 @@ import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
public class AttributeAggregateTest extends EasyMockTest {
-
- private Supplier<ImmutableSet<IScheduledTask>> activeTaskSupplier;
private AttributeStore attributeStore;
- private AttributeAggregate aggregate;
@Before
public void setUp() throws Exception {
- activeTaskSupplier = createMock(new Clazz<Supplier<ImmutableSet<IScheduledTask>>>() { });
attributeStore = createMock(AttributeStore.class);
- aggregate = new AttributeAggregate(activeTaskSupplier, attributeStore);
}
@Test
public void testNoTasks() {
- expectGetTasks();
-
control.replay();
- assertAggregates(ImmutableMap.<Pair<String, String>, Long>of());
- assertAggregate("none", "alsoNone", 0);
+ AttributeAggregate aggregate = aggregate();
+ assertEquals(ImmutableMultiset.<Pair<String, String>>of(), aggregate.getAggregates());
+ assertAggregate(aggregate, "none", "alsoNone", 0);
}
@Test(expected = IllegalStateException.class)
public void testAttributesMissing() {
- expectGetTasks(task("1", "a"));
expect(attributeStore.getHostAttributes("a")).andReturn(Optional.<IHostAttributes>absent());
control.replay();
- aggregate.getAggregates();
+ aggregate(task("1", "a")).getAggregates();
}
@Test(expected = NullPointerException.class)
public void testTaskWithNoHost() {
- expectGetTasks(task("1", null));
-
control.replay();
- aggregate.getAggregates();
+ aggregate(task("1", null)).getAggregates();
}
@Test
public void testNoAttributes() {
- expectGetTasks(task("1", "hostA"));
expectGetAttributes("hostA");
control.replay();
- assertAggregates(ImmutableMap.<Pair<String, String>, Long>of());
+ assertEquals(
+ ImmutableMultiset.<Pair<String, String>>of(),
+ aggregate(task("1", "hostA")).getAggregates());
}
@Test
public void testAggregate() {
- expectGetTasks(
- task("1", "a1"),
- task("2", "b1"),
- task("3", "b1"),
- task("4", "b2"),
- task("5", "c1"));
expectGetAttributes(
"a1",
attribute("host", "a1"),
@@ -121,29 +106,37 @@ public class AttributeAggregateTest extends EasyMockTest {
control.replay();
- Map<Pair<String, String>, Long> expected = ImmutableMap.<Pair<String, String>, Long>builder()
- .put(Pair.of("rack", "a"), 1L)
- .put(Pair.of("rack", "b"), 3L)
- .put(Pair.of("rack", "c"), 1L)
- .put(Pair.of("host", "a1"), 1L)
- .put(Pair.of("host", "b1"), 2L)
- .put(Pair.of("host", "b2"), 1L)
- .put(Pair.of("host", "c1"), 1L)
- .put(Pair.of("pdu", "p1"), 4L)
- .put(Pair.of("pdu", "p2"), 4L)
- .put(Pair.of("ssd", "true"), 1L)
+ Multiset<Pair<String, String>> expected = ImmutableMultiset.<Pair<String, String>>builder()
+ .add(Pair.of("rack", "a"))
+ .addCopies(Pair.of("rack", "b"), 3)
+ .add(Pair.of("rack", "c"))
+ .add(Pair.of("host", "a1"))
+ .addCopies(Pair.of("host", "b1"), 2)
+ .add(Pair.of("host", "b2"))
+ .add(Pair.of("host", "c1"))
+ .addCopies(Pair.of("pdu", "p1"), 4)
+ .addCopies(Pair.of("pdu", "p2"), 4)
+ .add(Pair.of("ssd", "true"))
.build();
- assertAggregates(expected);
- for (Map.Entry<Pair<String, String>, Long> entry : expected.entrySet()) {
- assertAggregate(entry.getKey().getFirst(), entry.getKey().getSecond(), entry.getValue());
+ AttributeAggregate aggregate = aggregate(
+ task("1", "a1"),
+ task("2", "b1"),
+ task("3", "b1"),
+ task("4", "b2"),
+ task("5", "c1"));
+ assertEquals(expected, aggregate.getAggregates());
+ for (Multiset.Entry<Pair<String, String>> entry : expected.entrySet()) {
+ Pair<String, String> element = entry.getElement();
+ assertAggregate(aggregate, element.getFirst(), element.getSecond(), entry.getCount());
}
- assertAggregate("host", "c2", 0L);
- assertAggregate("hostc", "2", 0L);
+ assertAggregate(aggregate, "host", "c2", 0L);
+ assertAggregate(aggregate, "hostc", "2", 0L);
}
- private void expectGetTasks(IScheduledTask... activeTasks) {
- expect(activeTaskSupplier.get())
- .andReturn(ImmutableSet.<IScheduledTask>builder().add(activeTasks).build());
+ private AttributeAggregate aggregate(IScheduledTask... activeTasks) {
+ return AttributeAggregate.create(
+ Suppliers.<Iterable<IScheduledTask>>ofInstance(ImmutableSet.copyOf(activeTasks)),
+ attributeStore);
}
private IExpectationSetters<?> expectGetAttributes(String host, Attribute... attributes) {
@@ -153,11 +146,12 @@ public class AttributeAggregateTest extends EasyMockTest {
.setAttributes(ImmutableSet.<Attribute>builder().add(attributes).build()))));
}
- private void assertAggregates(Map<Pair<String, String>, Long> expected) {
- assertEquals(expected, aggregate.getAggregates());
- }
+ private void assertAggregate(
+ AttributeAggregate aggregate,
+ String name,
+ String value,
+ long expected) {
- private void assertAggregate(String name, String value, long expected) {
assertEquals(expected, aggregate.getNumTasksWithAttribute(name, value));
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index d06b89c..26bad99 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -15,16 +15,14 @@ package org.apache.aurora.scheduler.filter;
import java.util.Arrays;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import com.google.common.base.Optional;
import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.twitter.common.collections.Pair;
import com.twitter.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.Attribute;
import org.apache.aurora.gen.Constraint;
import org.apache.aurora.gen.ExecutorConfig;
@@ -32,7 +30,6 @@ import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.Identity;
import org.apache.aurora.gen.LimitConstraint;
import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskConstraint;
import org.apache.aurora.gen.ValueConstraint;
@@ -45,20 +42,18 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup;
import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoType;
import org.apache.aurora.scheduler.mesos.Offers;
import org.apache.aurora.scheduler.mesos.TaskExecutors;
-import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.entities.IAttribute;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.easymock.IExpectationSetters;
import org.junit.Before;
import org.junit.Test;
import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.CPU;
import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.DISK;
import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.PORTS;
import static org.apache.aurora.scheduler.filter.SchedulingFilterImpl.ResourceVector.RAM;
-import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
public class SchedulingFilterImplTest extends EasyMockTest {
@@ -92,20 +87,11 @@ public class SchedulingFilterImplTest extends EasyMockTest {
private static final ResourceSlot DEFAULT_OFFER = ResourceSlot.from(
Offers.createOffer(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK, Pair.of(80, 80)));
- private AttributeAggregate emptyJob;
-
- private final AtomicLong taskIdCounter = new AtomicLong();
-
private SchedulingFilter defaultFilter;
- private AttributeStore.Mutable attributeStore;
@Before
public void setUp() {
defaultFilter = new SchedulingFilterImpl(TaskExecutors.NO_OVERHEAD_EXECUTOR);
- attributeStore = createMock(AttributeStore.Mutable.class);
- emptyJob = new AttributeAggregate(
- Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
- attributeStore);
}
@Test
@@ -145,22 +131,22 @@ public class SchedulingFilterImplTest extends EasyMockTest {
none,
defaultFilter.filter(
new UnusedResource(twoPorts, hostA),
- new ResourceRequest(noPortTask, TASK_ID, emptyJob)));
+ new ResourceRequest(noPortTask, TASK_ID, EMPTY)));
assertEquals(
none,
defaultFilter.filter(
new UnusedResource(twoPorts, hostA),
- new ResourceRequest(onePortTask, TASK_ID, emptyJob)));
+ new ResourceRequest(onePortTask, TASK_ID, EMPTY)));
assertEquals(
none,
defaultFilter.filter(
new UnusedResource(twoPorts, hostA),
- new ResourceRequest(twoPortTask, TASK_ID, emptyJob)));
+ new ResourceRequest(twoPortTask, TASK_ID, EMPTY)));
assertEquals(
ImmutableSet.of(PORTS.veto(1)),
defaultFilter.filter(
new UnusedResource(twoPorts, hostA),
- new ResourceRequest(threePortTask, TASK_ID, emptyJob)));
+ new ResourceRequest(threePortTask, TASK_ID, EMPTY)));
}
@Test
@@ -296,38 +282,41 @@ public class SchedulingFilterImplTest extends EasyMockTest {
assertNoVetoes(hostLimitTask(2), hostAttributes(HOST_A, host(HOST_A)));
}
- private Attribute host(String host) {
+ private IAttribute host(String host) {
return valueAttribute(HOST_ATTRIBUTE, host);
}
- private Attribute rack(String rack) {
+ private IAttribute rack(String rack) {
return valueAttribute(RACK_ATTRIBUTE, rack);
}
- private Attribute dedicated(String value, String... values) {
+ private IAttribute dedicated(String value, String... values) {
return valueAttribute(DEDICATED_ATTRIBUTE, value, values);
}
@Test
public void testLimitWithinJob() throws Exception {
- expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
- expectGetHostAttributes(HOST_B, host(HOST_B), rack(RACK_A)).atLeastOnce();
- expectGetHostAttributes(HOST_C, host(HOST_C), rack(RACK_B)).atLeastOnce();
-
- AttributeAggregate stateA = new AttributeAggregate(Suppliers.ofInstance(ImmutableSet.of(
- makeScheduledTask(OWNER_A, JOB_A, HOST_A),
- makeScheduledTask(OWNER_A, JOB_A, HOST_B),
- makeScheduledTask(OWNER_A, JOB_A, HOST_B),
- makeScheduledTask(OWNER_A, JOB_A, HOST_C))),
- attributeStore);
- AttributeAggregate stateB = new AttributeAggregate(Suppliers.ofInstance(ImmutableSet.of(
- makeScheduledTask(OWNER_B, JOB_A, HOST_A),
- makeScheduledTask(OWNER_B, JOB_A, HOST_A),
- makeScheduledTask(OWNER_B, JOB_A, HOST_B))),
- attributeStore);
-
control.replay();
+ AttributeAggregate stateA = AttributeAggregate.create(
+ Suppliers.<Iterable<IAttribute>>ofInstance(ImmutableList.of(
+ host(HOST_A),
+ rack(RACK_A),
+ host(HOST_B),
+ rack(RACK_A),
+ host(HOST_B),
+ rack(RACK_A),
+ host(HOST_C),
+ rack(RACK_B))));
+ AttributeAggregate stateB = AttributeAggregate.create(
+ Suppliers.<Iterable<IAttribute>>ofInstance(ImmutableList.of(
+ host(HOST_A),
+ rack(RACK_A),
+ host(HOST_A),
+ rack(RACK_A),
+ host(HOST_B),
+ rack(RACK_A))));
+
IHostAttributes hostA = hostAttributes(HOST_A, host(HOST_A), rack(RACK_A));
IHostAttributes hostB = hostAttributes(HOST_B, host(HOST_B), rack(RACK_A));
IHostAttributes hostC = hostAttributes(HOST_C, host(HOST_C), rack(RACK_B));
@@ -421,7 +410,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
ImmutableSet.<Veto>of(),
defaultFilter.filter(
new UnusedResource(DEFAULT_OFFER, hostA),
- new ResourceRequest(task, TASK_ID, emptyJob)));
+ new ResourceRequest(task, TASK_ID, EMPTY)));
Constraint jvmNegated = jvmConstraint.deepCopy();
jvmNegated.getConstraint().getValue().setNegated(true);
@@ -512,7 +501,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
return checkConstraint(
owner,
jobName,
- emptyJob,
+ EMPTY,
hostAttributes,
constraintName,
expected,
@@ -551,7 +540,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
}
private void assertNoVetoes(ITaskConfig task, IHostAttributes hostAttributes) {
- assertVetoes(task, hostAttributes, emptyJob);
+ assertVetoes(task, hostAttributes, EMPTY);
}
private void assertNoVetoes(
@@ -563,7 +552,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
}
private void assertVetoes(ITaskConfig task, IHostAttributes hostAttributes, Veto... vetoes) {
- assertVetoes(task, hostAttributes, emptyJob, vetoes);
+ assertVetoes(task, hostAttributes, EMPTY, vetoes);
}
private void assertVetoes(
@@ -582,25 +571,25 @@ public class SchedulingFilterImplTest extends EasyMockTest {
private static IHostAttributes hostAttributes(
String host,
MaintenanceMode mode,
- Attribute... attributes) {
+ IAttribute... attributes) {
return IHostAttributes.build(
new HostAttributes()
.setHost(host)
.setMode(mode)
- .setAttributes(ImmutableSet.<Attribute>builder().add(attributes).build()));
+ .setAttributes(IAttribute.toBuildersSet(ImmutableSet.copyOf(attributes))));
}
private static IHostAttributes hostAttributes(
String host,
- Attribute... attributes) {
+ IAttribute... attributes) {
return hostAttributes(host, MaintenanceMode.NONE, attributes);
}
- private Attribute valueAttribute(String name, String string, String... strings) {
- return new Attribute(name,
- ImmutableSet.<String>builder().add(string).addAll(Arrays.asList(strings)).build());
+ private IAttribute valueAttribute(String name, String string, String... strings) {
+ return IAttribute.build(new Attribute(name,
+ ImmutableSet.<String>builder().add(string).addAll(Arrays.asList(strings)).build()));
}
private static Constraint makeConstraint(String name, String... values) {
@@ -608,25 +597,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
TaskConstraint.value(new ValueConstraint(false, ImmutableSet.copyOf(values))));
}
- private IExpectationSetters<Optional<IHostAttributes>> expectGetHostAttributes(
- String host,
- Attribute... attributes) {
-
- IHostAttributes hostAttributes = IHostAttributes.build(new HostAttributes()
- .setHost(host)
- .setAttributes(ImmutableSet.<Attribute>builder().add(attributes).build()));
- return expect(attributeStore.getHostAttributes(host)).andReturn(Optional.of(hostAttributes));
- }
-
- private IScheduledTask makeScheduledTask(Identity owner, String jobName, String host) {
- return IScheduledTask.build(new ScheduledTask().setAssignedTask(
- new AssignedTask()
- .setSlaveHost(host)
- .setTaskId("Task-" + taskIdCounter.incrementAndGet())
- .setTask(hostLimitTask(owner, jobName, 1 /* Max per host not used here. */)
- .newBuilder())));
- }
-
private Constraint limitConstraint(String name, int value) {
return new Constraint(name, TaskConstraint.limit(new LimitConstraint(value)));
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/3b29a4b7/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index aca0234..ff95c36 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -13,7 +13,6 @@
*/
package org.apache.aurora.scheduler.state;
-import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.twitter.common.testing.easymock.EasyMockTest;
@@ -26,7 +25,6 @@ import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
@@ -34,7 +32,6 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
import org.apache.aurora.scheduler.state.TaskAssigner.Assignment;
import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
-import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.mesos.Protos.FrameworkID;
@@ -49,6 +46,7 @@ import org.apache.mesos.Protos.Value.Type;
import org.junit.Before;
import org.junit.Test;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.EMPTY;
import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import static org.apache.mesos.Protos.Offer;
import static org.easymock.EasyMock.expect;
@@ -89,7 +87,6 @@ public class TaskAssignerImplTest extends EasyMockTest {
private SchedulingFilter filter;
private MesosTaskFactory taskFactory;
private TaskAssigner assigner;
- private AttributeAggregate emptyJob;
@Before
public void setUp() throws Exception {
@@ -98,16 +95,13 @@ public class TaskAssignerImplTest extends EasyMockTest {
filter = createMock(SchedulingFilter.class);
taskFactory = createMock(MesosTaskFactory.class);
assigner = new TaskAssignerImpl(stateManager, filter, taskFactory);
- emptyJob = new AttributeAggregate(
- Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
- createMock(AttributeStore.class));
}
@Test
public void testAssignNoVetoes() {
expect(filter.filter(
new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()),
- new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), emptyJob)))
+ new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), EMPTY)))
.andReturn(ImmutableSet.<Veto>of());
expect(stateManager.assignTask(
storeProvider,
@@ -126,14 +120,14 @@ public class TaskAssignerImplTest extends EasyMockTest {
assigner.maybeAssign(
storeProvider,
OFFER,
- new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), emptyJob)));
+ new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), EMPTY)));
}
@Test
public void testAssignVetoes() {
expect(filter.filter(
new UnusedResource(ResourceSlot.from(MESOS_OFFER), OFFER.getAttributes()),
- new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), emptyJob)))
+ new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), EMPTY)))
.andReturn(ImmutableSet.of(Veto.constraintMismatch("denied")));
control.replay();
@@ -143,6 +137,6 @@ public class TaskAssignerImplTest extends EasyMockTest {
assigner.maybeAssign(
storeProvider,
OFFER,
- new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), emptyJob)));
+ new ResourceRequest(TASK.getAssignedTask().getTask(), Tasks.id(TASK), EMPTY)));
}
}