You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/11/28 19:02:35 UTC
[1/3] aurora git commit: Enable custom offer scoring modules for task
assignment
Repository: aurora
Updated Branches:
refs/heads/master 21af250c9 -> 80139da46
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
index 6b18296..ff80baa 100644
--- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import org.apache.aurora.common.collections.Pair;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
@@ -32,9 +33,12 @@ import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
import org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.offers.Deferment.Noop;
-import org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl;
+import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.resources.ResourceType;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -54,11 +58,13 @@ import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
import static org.apache.aurora.gen.MaintenanceMode.NONE;
import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
-import static org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.GLOBALLY_BANNED_OFFERS;
-import static org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.OFFER_ACCEPT_RACES;
-import static org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.OFFER_CANCEL_FAILURES;
-import static org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.OUTSTANDING_OFFERS;
-import static org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.STATICALLY_BANNED_OFFERS;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
+import static org.apache.aurora.scheduler.offers.OfferManagerImpl.GLOBALLY_BANNED_OFFERS;
+import static org.apache.aurora.scheduler.offers.OfferManagerImpl.OFFER_ACCEPT_RACES;
+import static org.apache.aurora.scheduler.offers.OfferManagerImpl.OFFER_CANCEL_FAILURES;
+import static org.apache.aurora.scheduler.offers.OfferManagerImpl.OUTSTANDING_OFFERS;
+import static org.apache.aurora.scheduler.offers.OfferManagerImpl.STATICALLY_BANNED_OFFERS;
+import static org.apache.aurora.scheduler.offers.OfferManagerImpl.VETO_EVALUATED_OFFERS;
import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
@@ -66,6 +72,8 @@ import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -89,21 +97,25 @@ public class OfferManagerImplTest extends EasyMockTest {
private static final String HOST_C = "HOST_C";
private static final HostOffer OFFER_C = new HostOffer(
Offers.makeOffer("OFFER_C", HOST_C),
- IHostAttributes.build(new HostAttributes().setMode(NONE)));
+ IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_C)));
private static final int PORT = 1000;
private static final Protos.Offer MESOS_OFFER = offer(mesosRange(PORTS, PORT));
private static final IScheduledTask TASK = makeTask("id", JOB);
+ private static final ResourceRequest EMPTY_REQUEST = new ResourceRequest(
+ TASK.getAssignedTask().getTask(),
+ ResourceBag.EMPTY,
+ empty());
private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask());
private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
.setName("taskName")
.setTaskId(Protos.TaskID.newBuilder().setValue(Tasks.id(TASK)))
.setAgentId(MESOS_OFFER.getAgentId())
.build();
- private static Operation launch = Operation.newBuilder()
+ private static final Operation LAUNCH = Operation.newBuilder()
.setType(Operation.Type.LAUNCH)
.setLaunch(Operation.Launch.newBuilder().addTaskInfos(TASK_INFO))
.build();
- private static final List<Operation> OPERATIONS = ImmutableList.of(launch);
+ private static final List<Operation> OPERATIONS = ImmutableList.of(LAUNCH);
private static final long OFFER_FILTER_SECONDS = 0;
private static final Filters OFFER_FILTER = Filters.newBuilder()
.setRefuseSeconds(OFFER_FILTER_SECONDS)
@@ -113,6 +125,7 @@ public class OfferManagerImplTest extends EasyMockTest {
private Driver driver;
private OfferManagerImpl offerManager;
private FakeStatsProvider statsProvider;
+ private SchedulingFilter schedulingFilter;
@Before
public void setUp() {
@@ -125,7 +138,13 @@ public class OfferManagerImplTest extends EasyMockTest {
FAKE_TICKER
);
statsProvider = new FakeStatsProvider();
- offerManager = new OfferManagerImpl(driver, offerSettings, statsProvider, new Noop());
+ schedulingFilter = createMock(SchedulingFilter.class);
+
+ offerManager = new OfferManagerImpl(driver,
+ offerSettings,
+ statsProvider,
+ new Noop(),
+ schedulingFilter);
}
@Test
@@ -136,11 +155,11 @@ public class OfferManagerImplTest extends EasyMockTest {
control.replay();
- offerManager.addOffer(hostOfferB);
- offerManager.addOffer(OFFER_A);
- offerManager.addOffer(hostOfferC);
+ offerManager.add(hostOfferB);
+ offerManager.add(OFFER_A);
+ offerManager.add(hostOfferC);
- List<HostOffer> actual = ImmutableList.copyOf(offerManager.getOffers());
+ List<HostOffer> actual = ImmutableList.copyOf(offerManager.getAll());
assertEquals(
// hostOfferC has a further away start time, so it should be preferred.
@@ -160,15 +179,15 @@ public class OfferManagerImplTest extends EasyMockTest {
control.replay();
- offerManager.addOffer(offerA);
+ offerManager.add(offerA);
assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
- offerManager.addOffer(OFFER_B);
+ offerManager.add(OFFER_B);
assertEquals(2, statsProvider.getLongValue(OUTSTANDING_OFFERS));
- offerManager.addOffer(offerC);
+ offerManager.add(offerC);
assertEquals(3, statsProvider.getLongValue(OUTSTANDING_OFFERS));
assertEquals(
ImmutableSet.of(OFFER_B, offerA, offerC),
- ImmutableSet.copyOf(offerManager.getOffers()));
+ ImmutableSet.copyOf(offerManager.getAll()));
offerManager.launchTask(OFFER_B.getOffer().getId(), TASK_INFO);
assertEquals(2, statsProvider.getLongValue(OUTSTANDING_OFFERS));
}
@@ -179,19 +198,19 @@ public class OfferManagerImplTest extends EasyMockTest {
offerManager.hostAttributesChanged(new HostAttributesChanged(HOST_ATTRIBUTES_A));
- offerManager.addOffer(OFFER_A);
- offerManager.addOffer(OFFER_B);
- assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getOffers()));
+ offerManager.add(OFFER_A);
+ offerManager.add(OFFER_B);
+ assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getAll()));
HostOffer offerA = setMode(OFFER_A, DRAINING);
offerManager.hostAttributesChanged(new HostAttributesChanged(offerA.getAttributes()));
- assertEquals(ImmutableSet.of(OFFER_B, offerA), ImmutableSet.copyOf(offerManager.getOffers()));
+ assertEquals(ImmutableSet.of(OFFER_B, offerA), ImmutableSet.copyOf(offerManager.getAll()));
offerA = setMode(OFFER_A, NONE);
HostOffer offerB = setMode(OFFER_B, DRAINING);
offerManager.hostAttributesChanged(new HostAttributesChanged(offerA.getAttributes()));
offerManager.hostAttributesChanged(new HostAttributesChanged(offerB.getAttributes()));
- assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getOffers()));
+ assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getAll()));
}
@Test
@@ -201,9 +220,9 @@ public class OfferManagerImplTest extends EasyMockTest {
control.replay();
- offerManager.addOffer(OFFER_A);
+ offerManager.add(OFFER_A);
assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
- offerManager.addOffer(OFFER_A);
+ offerManager.add(OFFER_A);
assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS));
}
@@ -211,74 +230,83 @@ public class OfferManagerImplTest extends EasyMockTest {
public void testGetOffersReturnsAllOffers() {
control.replay();
- offerManager.addOffer(OFFER_A);
- assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
+ offerManager.add(OFFER_A);
+ assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
- offerManager.cancelOffer(OFFER_A_ID);
+ offerManager.cancel(OFFER_A_ID);
assertEquals(0, statsProvider.getLongValue(OFFER_CANCEL_FAILURES));
- assertTrue(Iterables.isEmpty(offerManager.getOffers()));
+ assertTrue(Iterables.isEmpty(offerManager.getAll()));
assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS));
}
@Test
public void testOfferFilteringDueToStaticBan() {
+ expectFilterNone();
+
control.replay();
// Static ban ignored when now offers.
- offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
+ offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
- offerManager.addOffer(OFFER_A);
- assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
- assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
+ offerManager.add(OFFER_A);
+ assertEquals(OFFER_A,
+ Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
// Add static ban.
- offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
+ offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
- assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
- assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
+ assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
+ assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
}
@Test
public void testStaticBanExpiresAfterMaxHoldTime() throws InterruptedException {
+ expectFilterNone();
+
control.replay();
- offerManager.addOffer(OFFER_A);
- offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
- assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
- assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
+ offerManager.add(OFFER_A);
+ offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
+ assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
+ assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
// Make sure the static ban expires after maximum amount of time an offer is held.
FAKE_TICKER.advance(RETURN_DELAY);
offerManager.cleanupStaticBans();
- assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
+ assertEquals(OFFER_A,
+ Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
}
@Test
public void testStaticBanIsClearedOnDriverDisconnect() {
+ expectFilterNone();
+
control.replay();
- offerManager.addOffer(OFFER_A);
- offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
- assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
- assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
+ offerManager.add(OFFER_A);
+ offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
+ assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
+ assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
// Make sure the static ban is cleared when driver is disconnected.
offerManager.driverDisconnected(new DriverDisconnected());
assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
- offerManager.addOffer(OFFER_A);
- assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
+ offerManager.add(OFFER_A);
+ assertEquals(OFFER_A,
+ Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
}
@Test
public void testGetOffer() {
control.replay();
- offerManager.addOffer(OFFER_A);
- assertEquals(Optional.of(OFFER_A), offerManager.getOffer(OFFER_A.getOffer().getAgentId()));
+ offerManager.add(OFFER_A);
+ assertEquals(Optional.of(OFFER_A), offerManager.get(OFFER_A.getOffer().getAgentId()));
assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
}
@@ -289,7 +317,7 @@ public class OfferManagerImplTest extends EasyMockTest {
control.replay();
- offerManager.addOffer(OFFER_A);
+ offerManager.add(OFFER_A);
offerManager.launchTask(OFFER_A_ID, TASK_INFO);
}
@@ -308,8 +336,8 @@ public class OfferManagerImplTest extends EasyMockTest {
public void testFlushOffers() {
control.replay();
- offerManager.addOffer(OFFER_A);
- offerManager.addOffer(OFFER_B);
+ offerManager.add(OFFER_A);
+ offerManager.add(OFFER_B);
assertEquals(2, statsProvider.getLongValue(OUTSTANDING_OFFERS));
offerManager.driverDisconnected(new DriverDisconnected());
assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS));
@@ -319,26 +347,29 @@ public class OfferManagerImplTest extends EasyMockTest {
public void testCancelFailure() {
control.replay();
- offerManager.cancelOffer(OFFER_A.getOffer().getId());
+ offerManager.cancel(OFFER_A.getOffer().getId());
assertEquals(1, statsProvider.getLongValue(OFFER_CANCEL_FAILURES));
}
@Test
public void testBanAndUnbanOffer() {
+ expectFilterNone();
+
control.replay();
// After adding a banned offer, user can see it is in OUTSTANDING_OFFERS but cannot retrieve it.
- offerManager.banOffer(OFFER_A_ID);
- offerManager.addOffer(OFFER_A);
+ offerManager.ban(OFFER_A_ID);
+ offerManager.add(OFFER_A);
assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
- assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
+ assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
- offerManager.cancelOffer(OFFER_A_ID);
- offerManager.addOffer(OFFER_A);
+ offerManager.cancel(OFFER_A_ID);
+ offerManager.add(OFFER_A);
assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
- assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
+ assertEquals(OFFER_A,
+ Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
}
private static HostOffer setUnavailability(HostOffer offer, long startMs) {
@@ -363,7 +394,7 @@ public class OfferManagerImplTest extends EasyMockTest {
RETURN_DELAY,
Long.MAX_VALUE,
FAKE_TICKER);
- return new OfferManagerImpl(driver, settings, statsProvider, new Noop());
+ return new OfferManagerImpl(driver, settings, statsProvider, new Noop(), schedulingFilter);
}
@Test
@@ -377,25 +408,25 @@ public class OfferManagerImplTest extends EasyMockTest {
mesosScalar(CPUS, 24.0, true),
mesosScalar(RAM_MB, 1024)),
HOST_ATTRIBUTES_A), DRAINING);
-
HostOffer medium = setMode(new HostOffer(
offer("host2", mesosScalar(CPUS, 5.0), mesosScalar(RAM_MB, 1024)),
HOST_ATTRIBUTES_A), DRAINING);
-
HostOffer large = setMode(new HostOffer(
offer("host3", mesosScalar(CPUS, 10.0), mesosScalar(RAM_MB, 1024)),
HOST_ATTRIBUTES_A), DRAINING);
+ expectFilterNone();
+
control.replay();
- cpuManager.addOffer(medium);
- cpuManager.addOffer(large);
- cpuManager.addOffer(small);
+ cpuManager.add(medium);
+ cpuManager.add(large);
+ cpuManager.add(small);
assertEquals(ImmutableList.of(small, medium, large),
- ImmutableList.copyOf(cpuManager.getOffers(GROUP_KEY)));
+ ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
assertEquals(ImmutableList.of(small, medium, large),
- ImmutableList.copyOf(cpuManager.getOffers()));
+ ImmutableList.copyOf(cpuManager.getAll()));
}
@Test
@@ -410,7 +441,6 @@ public class OfferManagerImplTest extends EasyMockTest {
mesosScalar(CPUS, 23.0, true),
mesosScalar(RAM_MB, 1024)),
HOST_ATTRIBUTES_A), DRAINING);
-
HostOffer medium = setMode(new HostOffer(
offer(
"host1",
@@ -418,21 +448,22 @@ public class OfferManagerImplTest extends EasyMockTest {
mesosScalar(CPUS, 24.0, true),
mesosScalar(RAM_MB, 1024)),
HOST_ATTRIBUTES_A), DRAINING);
-
HostOffer large = setMode(new HostOffer(
offer("host3", mesosScalar(CPUS, 1.0), mesosScalar(RAM_MB, 1024)),
HOST_ATTRIBUTES_A), DRAINING);
+ expectFilterNone();
+
control.replay();
- cpuManager.addOffer(medium);
- cpuManager.addOffer(large);
- cpuManager.addOffer(small);
+ cpuManager.add(medium);
+ cpuManager.add(large);
+ cpuManager.add(small);
assertEquals(ImmutableList.of(small, medium, large),
- ImmutableList.copyOf(cpuManager.getOffers(GROUP_KEY)));
+ ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, true)));
assertEquals(ImmutableList.of(small, medium, large),
- ImmutableList.copyOf(cpuManager.getOffers()));
+ ImmutableList.copyOf(cpuManager.getAll()));
}
@Test
@@ -442,25 +473,25 @@ public class OfferManagerImplTest extends EasyMockTest {
HostOffer small = setMode(new HostOffer(
offer("host1", mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1), mesosScalar(DISK_MB, 1.0)),
HOST_ATTRIBUTES_A), DRAINING);
-
HostOffer medium = setMode(new HostOffer(
offer("host2", mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1), mesosScalar(DISK_MB, 5.0)),
HOST_ATTRIBUTES_A), DRAINING);
-
HostOffer large = setMode(new HostOffer(
offer("host3", mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1), mesosScalar(DISK_MB, 10.0)),
HOST_ATTRIBUTES_A), DRAINING);
+ expectFilterNone();
+
control.replay();
- cpuManager.addOffer(medium);
- cpuManager.addOffer(large);
- cpuManager.addOffer(small);
+ cpuManager.add(medium);
+ cpuManager.add(large);
+ cpuManager.add(small);
assertEquals(ImmutableList.of(small, medium, large),
- ImmutableList.copyOf(cpuManager.getOffers(GROUP_KEY)));
+ ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
assertEquals(ImmutableList.of(small, medium, large),
- ImmutableList.copyOf(cpuManager.getOffers()));
+ ImmutableList.copyOf(cpuManager.getAll()));
}
@Test
@@ -470,25 +501,25 @@ public class OfferManagerImplTest extends EasyMockTest {
HostOffer small = setMode(new HostOffer(
offer("host1", mesosScalar(CPUS, 10), mesosScalar(RAM_MB, 1.0)),
HOST_ATTRIBUTES_A), DRAINING);
-
HostOffer medium = setMode(new HostOffer(
offer("host2", mesosScalar(CPUS, 10), mesosScalar(RAM_MB, 5.0)),
HOST_ATTRIBUTES_A), DRAINING);
-
HostOffer large = setMode(new HostOffer(
offer("host3", mesosScalar(CPUS, 10), mesosScalar(RAM_MB, 10.0)),
HOST_ATTRIBUTES_A), DRAINING);
+ expectFilterNone();
+
control.replay();
- cpuManager.addOffer(medium);
- cpuManager.addOffer(large);
- cpuManager.addOffer(small);
+ cpuManager.add(medium);
+ cpuManager.add(large);
+ cpuManager.add(small);
assertEquals(ImmutableList.of(small, medium, large),
- ImmutableList.copyOf(cpuManager.getOffers(GROUP_KEY)));
+ ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
assertEquals(ImmutableList.of(small, medium, large),
- ImmutableList.copyOf(cpuManager.getOffers()));
+ ImmutableList.copyOf(cpuManager.getAll()));
}
@Test
@@ -502,14 +533,12 @@ public class OfferManagerImplTest extends EasyMockTest {
mesosScalar(RAM_MB, 2.0),
mesosScalar(DISK_MB, 3.0)),
HOST_ATTRIBUTES_A), DRAINING);
-
HostOffer medium = setMode(new HostOffer(
offer("host2",
mesosScalar(CPUS, 1.0),
mesosScalar(RAM_MB, 3.0),
mesosScalar(DISK_MB, 2.0)),
HOST_ATTRIBUTES_A), DRAINING);
-
HostOffer large = setMode(new HostOffer(
offer("host3",
mesosScalar(CPUS, 10.0),
@@ -518,16 +547,18 @@ public class OfferManagerImplTest extends EasyMockTest {
mesosScalar(DISK_MB, 1.0)),
HOST_ATTRIBUTES_A), DRAINING);
+ expectFilterNone();
+
control.replay();
- cpuManager.addOffer(large);
- cpuManager.addOffer(medium);
- cpuManager.addOffer(small);
+ cpuManager.add(large);
+ cpuManager.add(medium);
+ cpuManager.add(small);
assertEquals(ImmutableList.of(small, medium, large),
- ImmutableList.copyOf(cpuManager.getOffers(GROUP_KEY)));
+ ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
assertEquals(ImmutableList.of(small, medium, large),
- ImmutableList.copyOf(cpuManager.getOffers()));
+ ImmutableList.copyOf(cpuManager.getAll()));
}
@Test
@@ -545,13 +576,14 @@ public class OfferManagerImplTest extends EasyMockTest {
driver,
settings,
statsProvider,
- new Deferment.DelayedDeferment(() -> RETURN_DELAY, executorMock));
+ new Deferment.DelayedDeferment(() -> RETURN_DELAY, executorMock),
+ schedulingFilter);
driver.declineOffer(OFFER_A_ID, OFFER_FILTER);
control.replay();
- offerManager.addOffer(OFFER_A);
+ offerManager.add(OFFER_A);
assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
clock.advance(RETURN_DELAY);
@@ -575,11 +607,154 @@ public class OfferManagerImplTest extends EasyMockTest {
control.replay();
- offerManager.banOffer(OFFER_A_ID);
- offerManager.addOffer(OFFER_A);
- offerManager.addOffer(sameAgent);
- offerManager.cancelOffer(OFFER_A_ID);
- offerManager.addOffer(sameAgent2);
- assertEquals(ImmutableSet.of(sameAgent2), offerManager.getOffers());
+ offerManager.ban(OFFER_A_ID);
+ offerManager.add(OFFER_A);
+ offerManager.add(sameAgent);
+ offerManager.cancel(OFFER_A_ID);
+ offerManager.add(sameAgent2);
+ assertEquals(ImmutableSet.of(sameAgent2), offerManager.getAll());
+ }
+
+ private void expectFilterNone() {
+ // Most tests will use a permissive scheduling filter
+ expect(schedulingFilter.filter(anyObject(), anyObject()))
+ .andReturn(ImmutableSet.of())
+ .anyTimes();
+ }
+
+ @Test
+ public void testGetMatchingSingleAgent() {
+ expectFilterNone();
+
+ control.replay();
+ offerManager.add(OFFER_A);
+ assertEquals(Optional.of(OFFER_A),
+ offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false));
+ }
+
+ @Test
+ public void testGetMatchingNoGloballyBanned() {
+ expectFilterNone();
+
+ control.replay();
+ offerManager.add(OFFER_A);
+ assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
+ offerManager.ban(OFFER_A_ID);
+ assertEquals(Optional.absent(),
+ offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false));
+ assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
+ }
+
+ @Test
+ public void testGetMatchingNoVetoed() {
+ // Calling getMatching when a veto is present should return an empty option. Additionally,
+ // it should not statically ban the offer if it is vetoed.
+ expect(schedulingFilter.filter(new UnusedResource(OFFER_A, false), EMPTY_REQUEST))
+ .andReturn(ImmutableSet.of(SchedulingFilter.Veto.dedicatedHostConstraintMismatch()));
+
+ control.replay();
+ offerManager.add(OFFER_A);
+ assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+ assertEquals(Optional.absent(),
+ offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false));
+ assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+ }
+
+ @Test
+ public void testGetAllMatching() {
+ expectFilterNone();
+
+ control.replay();
+ offerManager.add(OFFER_A);
+ offerManager.add(OFFER_B);
+ offerManager.add(OFFER_C);
+ assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+ assertEquals(ImmutableSet.of(OFFER_A, OFFER_B, OFFER_C),
+ ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ assertEquals(3, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+ }
+
+ @Test
+ public void testGetAllMatchingNoGloballyBanned() {
+ expectFilterNone();
+
+ control.replay();
+ offerManager.add(OFFER_A);
+ offerManager.add(OFFER_B);
+ offerManager.add(OFFER_C);
+ assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+ assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
+ offerManager.ban(OFFER_B.getOffer().getId());
+ assertEquals(ImmutableSet.of(OFFER_A, OFFER_C),
+ ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ assertEquals(2, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+ assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
+ }
+
+ @Test
+ public void testGetAllMatchingNoStaticallyBanned() {
+ expectFilterNone();
+
+ control.replay();
+ offerManager.add(OFFER_A);
+ offerManager.add(OFFER_B);
+ offerManager.add(OFFER_C);
+ assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+ assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+ offerManager.banForTaskGroup(OFFER_B.getOffer().getId(), GROUP_KEY);
+ assertEquals(ImmutableSet.of(OFFER_A, OFFER_C),
+ ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ assertEquals(2, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+ assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+ assertEquals(ImmutableSet.of(Pair.of(OFFER_B.getOffer().getId(), GROUP_KEY)),
+ offerManager.getStaticBans());
+ }
+
+ @Test
+ public void testGetAllMatchingIgnoreNoCpusAndMem() {
+ expectFilterNone();
+
+ HostOffer empty = setMode(new HostOffer(
+ offer("host1",
+ mesosScalar(CPUS, 0),
+ mesosScalar(RAM_MB, 0),
+ mesosScalar(DISK_MB, 3.0)),
+ HOST_ATTRIBUTES_A), NONE);
+
+ control.replay();
+ offerManager.add(empty);
+ offerManager.add(OFFER_A);
+ assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
+ assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+ assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+ assertEquals(ImmutableSet.of(OFFER_A),
+ ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ assertEquals(1, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+ assertEquals(ImmutableSet.of(empty, OFFER_A),
+ ImmutableSet.copyOf(offerManager.getAll()));
+ }
+
+ @Test
+ public void testGetAllMatchingNoVetoed() {
+ // Calling getAllMatching should statically ban the offer as well if it is statically vetoed
+ expect(schedulingFilter.filter(new UnusedResource(OFFER_A, false), EMPTY_REQUEST))
+ .andReturn(ImmutableSet.of(SchedulingFilter.Veto.dedicatedHostConstraintMismatch()));
+ expect(schedulingFilter.filter(new UnusedResource(OFFER_B, false), EMPTY_REQUEST))
+ .andReturn(ImmutableSet.of());
+ expect(schedulingFilter.filter(new UnusedResource(OFFER_C, false), EMPTY_REQUEST))
+ .andReturn(ImmutableSet.of(SchedulingFilter.Veto.unsatisfiedLimit("test_limit")));
+
+ control.replay();
+ offerManager.add(OFFER_A);
+ offerManager.add(OFFER_B);
+ offerManager.add(OFFER_C);
+ assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+ assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+ assertEquals(ImmutableSet.of(OFFER_B),
+ ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false)));
+ assertEquals(3, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+ assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+ assertEquals(ImmutableSet.of(Pair.of(OFFER_A.getOffer().getId(), GROUP_KEY)),
+ offerManager.getStaticBans());
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java
index c76b3e3..a346e44 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java
@@ -300,7 +300,7 @@ public class PendingTaskProcessorTest extends EasyMockTest {
}
private void expectOffers(HostOffer... offers) {
- expect(offerManager.getOffers()).andReturn(ImmutableSet.copyOf(offers));
+ expect(offerManager.getAll()).andReturn(ImmutableSet.copyOf(offers));
}
private void expectGetClusterState(IScheduledTask... returnedTasks) {
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
index f14cba1..1061583 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
@@ -82,7 +82,7 @@ public class PreemptorImplTest extends EasyMockTest {
slotCache = createMock(new Clazz<BiCache<PreemptionProposal, TaskGroupKey>>() { });
statsProvider = new FakeStatsProvider();
OfferManager offerManager = createMock(OfferManager.class);
- expect(offerManager.getOffer(anyObject(Protos.AgentID.class)))
+ expect(offerManager.get(anyObject(Protos.AgentID.class)))
.andReturn(Optional.of(OFFER))
.anyTimes();
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
index 97c238c..815a3ef 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
@@ -25,8 +25,8 @@ import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.scheduler.config.CliOptions;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.scheduling.TaskAssigner;
import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.state.TaskAssigner;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java
new file mode 100644
index 0000000..627055c
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
+import static org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class FirstFitOfferSelectorTest extends EasyMockTest {
+
+ private static final IAssignedTask TASK = makeTask("id", JOB).getAssignedTask();
+ private static final ResourceRequest EMPTY_REQUEST = new ResourceRequest(
+ TASK.getTask(),
+ ResourceBag.EMPTY,
+ empty());
+
+ private OfferSelector firstFitOfferSelector;
+
+ @Before
+ public void setUp() {
+ firstFitOfferSelector = new FirstFitOfferSelector();
+ }
+
+ @Test
+ public void testNoOffers() {
+ Iterable<HostOffer> offers = ImmutableList.of();
+
+ control.replay();
+
+ assertFalse(firstFitOfferSelector.select(offers, EMPTY_REQUEST).isPresent());
+ }
+
+ @Test
+ public void testReturnFirstOffer() {
+ HostOffer offerA = createMock(HostOffer.class);
+ HostOffer offerB = createMock(HostOffer.class);
+ Iterable<HostOffer> offers = ImmutableList.of(offerA, offerB);
+
+ control.replay();
+
+ assertEquals(offerA, firstFitOfferSelector.select(offers, EMPTY_REQUEST).get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java
new file mode 100644
index 0000000..e094950
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.base.InstanceKeys;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.state.StateChangeResult;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
+import org.apache.mesos.v1.Protos.AgentID;
+import org.apache.mesos.v1.Protos.FrameworkID;
+import org.apache.mesos.v1.Protos.OfferID;
+import org.apache.mesos.v1.Protos.TaskID;
+import org.apache.mesos.v1.Protos.TaskInfo;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
+import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
+import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
+import static org.apache.aurora.scheduler.scheduling.TaskAssignerImpl.ASSIGNER_LAUNCH_FAILURES;
+import static org.apache.aurora.scheduler.scheduling.TaskAssignerImpl.LAUNCH_FAILED_MSG;
+import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import static org.apache.mesos.v1.Protos.Offer;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class TaskAssignerImplTest extends EasyMockTest {
+
+ private static final int PORT = 1000;
+ private static final Offer MESOS_OFFER =
+ offer(mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1024), mesosRange(PORTS, PORT));
+ private static final String SLAVE_ID = MESOS_OFFER.getAgentId().getValue();
+ private static final HostOffer OFFER =
+ new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes()
+ .setHost(MESOS_OFFER.getHostname())
+ .setAttributes(ImmutableSet.of(
+ new Attribute("host", ImmutableSet.of(MESOS_OFFER.getHostname()))))));
+ private static final IScheduledTask TASK = makeTask("id", JOB);
+ private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask());
+ private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
+ .setName("taskName")
+ .setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK)))
+ .setAgentId(MESOS_OFFER.getAgentId())
+ .build();
+ private static final IInstanceKey INSTANCE_KEY =
+ InstanceKeys.from(JOB, TASK.getAssignedTask().getInstanceId());
+ private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of();
+ private static final HostOffer OFFER_2 = new HostOffer(
+ Offer.newBuilder()
+ .setId(OfferID.newBuilder().setValue("offerId0"))
+ .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
+ .setAgentId(AgentID.newBuilder().setValue("slaveId0"))
+ .setHostname("hostName0")
+ .addResources(mesosRange(PORTS, PORT))
+ .addResources(mesosScalar(CPUS, 1))
+ .addResources(mesosScalar(RAM_MB, 1024))
+ .build(),
+ IHostAttributes.build(new HostAttributes()));
+
+ private static final Set<String> NO_ASSIGNMENT = ImmutableSet.of();
+
+ private AttributeAggregate aggregate;
+ private ResourceRequest resourceRequest;
+
+ private MutableStoreProvider storeProvider;
+ private StateManager stateManager;
+ private MesosTaskFactory taskFactory;
+ private OfferManager offerManager;
+ private TaskAssignerImpl assigner;
+ private TierManager tierManager;
+ private FakeStatsProvider statsProvider;
+ private UpdateAgentReserver updateAgentReserver;
+
+ @Before
+ public void setUp() throws Exception {
+ storeProvider = createMock(MutableStoreProvider.class);
+ taskFactory = createMock(MesosTaskFactory.class);
+ stateManager = createMock(StateManager.class);
+ offerManager = createMock(OfferManager.class);
+ tierManager = createMock(TierManager.class);
+ updateAgentReserver = createMock(UpdateAgentReserver.class);
+ statsProvider = new FakeStatsProvider();
+ // TODO(jly): FirstFitOfferSelector returns the first offer which is what we want for testing,
+ // but if its implementation becomes more complex we may need to replace it with a fake.
+ OfferSelector offerSelector = new FirstFitOfferSelector();
+ assigner = new TaskAssignerImpl(
+ stateManager,
+ taskFactory,
+ offerManager,
+ tierManager,
+ updateAgentReserver,
+ statsProvider,
+ offerSelector);
+ aggregate = empty();
+ resourceRequest = new ResourceRequest(
+ TASK.getAssignedTask().getTask(),
+ ResourceBag.EMPTY,
+ aggregate);
+ }
+
+ @Test
+ public void testAssignNoTasks() throws Exception {
+ control.replay();
+
+ assertEquals(
+ NO_ASSIGNMENT,
+ assigner.maybeAssign(storeProvider, null, null, ImmutableSet.of(), null));
+ }
+
+ @Test
+ public void testAssignmentClearedOnError() throws Exception {
+ expectNoUpdateReservations(1);
+ expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false))
+ .andReturn(ImmutableSet.of(OFFER, OFFER_2));
+ offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
+ expectLastCall().andThrow(new OfferManager.LaunchException("expected"));
+ expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+ expectAssignTask(MESOS_OFFER);
+ expect(stateManager.changeState(
+ storeProvider,
+ Tasks.id(TASK),
+ Optional.of(PENDING),
+ LOST,
+ LAUNCH_FAILED_MSG))
+ .andReturn(StateChangeResult.SUCCESS);
+ expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
+ .andReturn(TASK_INFO);
+
+ control.replay();
+
+ assertEquals(0L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES));
+ // Ensures scheduling loop terminates on the first launch failure.
+ assertEquals(
+ NO_ASSIGNMENT,
+ assigner.maybeAssign(
+ storeProvider,
+ resourceRequest,
+ TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+ ImmutableSet.of(
+ TASK.getAssignedTask(),
+ makeTask("id2", JOB).getAssignedTask(),
+ makeTask("id3", JOB).getAssignedTask()),
+ NO_RESERVATION));
+ assertEquals(1L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES));
+ }
+
+ @Test
+ public void testAssignmentSkippedForReservedSlave() throws Exception {
+ expectNoUpdateReservations(0);
+ expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+ expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false))
+ .andReturn(ImmutableSet.of(OFFER));
+
+ control.replay();
+
+ assertEquals(
+ NO_ASSIGNMENT,
+ assigner.maybeAssign(
+ storeProvider,
+ resourceRequest,
+ TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+ ImmutableSet.of(TASK.getAssignedTask()),
+ ImmutableMap.of(SLAVE_ID, TaskGroupKey.from(
+ ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", "e", "n")))))));
+ }
+
+ @Test
+ public void testTaskWithReservedSlaveLandsElsewhere() throws Exception {
+ // Ensures slave/task reservation relationship is only enforced in slave->task direction
+ // and permissive in task->slave direction. In other words, a task with a slave reservation
+ // should still be tried against other unreserved slaves.
+ expectNoUpdateReservations(1);
+ expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false))
+ .andReturn(ImmutableSet.of(OFFER_2, OFFER));
+ expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+ expectAssignTask(OFFER_2.getOffer());
+ expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER_2.getOffer(), false))
+ .andReturn(TASK_INFO);
+ offerManager.launchTask(OFFER_2.getOffer().getId(), TASK_INFO);
+
+ control.replay();
+
+ assertEquals(
+ ImmutableSet.of(Tasks.id(TASK)),
+ assigner.maybeAssign(
+ storeProvider,
+ resourceRequest,
+ TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+ ImmutableSet.of(TASK.getAssignedTask()),
+ ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+ }
+
+ @Test
+ public void testResourceMapperCallback() {
+ AssignedTask builder = TASK.newBuilder().getAssignedTask();
+ builder.unsetAssignedPorts();
+
+ control.replay();
+
+ assertEquals(
+ TASK.getAssignedTask(),
+ assigner.mapAndAssignResources(MESOS_OFFER, IAssignedTask.build(builder)));
+ }
+
+ @Test
+ public void testAssignToReservedAgent() throws Exception {
+ expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
+ expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
+ updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
+ expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest, false))
+ .andReturn(Optional.of(OFFER));
+ expectAssignTask(MESOS_OFFER);
+ offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
+ expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+
+ expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
+ .andReturn(TASK_INFO);
+
+ control.replay();
+
+ assertEquals(
+ ImmutableSet.of(Tasks.id(TASK)),
+ assigner.maybeAssign(
+ storeProvider,
+ resourceRequest,
+ TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+ ImmutableSet.of(
+ TASK.getAssignedTask()),
+ ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+ assertNotEquals(empty(), aggregate);
+ }
+
+ @Test
+ public void testAssignReservedAgentWhenOfferNotReady() throws Exception {
+ expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
+ expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
+ expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest, false))
+ .andReturn(Optional.absent());
+ expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+ expectLastCall();
+
+ control.replay();
+
+ assertEquals(
+ ImmutableSet.of(),
+ assigner.maybeAssign(
+ storeProvider,
+ resourceRequest,
+ TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+ ImmutableSet.of(TASK.getAssignedTask()),
+ ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+ assertEquals(empty(), aggregate);
+ }
+
+ @Test
+ public void testAssignWithMixOfReservedAndNotReserved() throws Exception {
+ expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+
+ expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
+ expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
+ updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
+ expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest, false))
+ .andReturn(Optional.of(OFFER));
+ expectAssignTask(MESOS_OFFER);
+ offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
+ expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
+ .andReturn(TASK_INFO);
+
+ // Normal scheduling loop for the remaining task...
+ IScheduledTask secondTask = makeTask("another-task", JOB, 9999);
+ TaskInfo secondTaskInfo = TaskInfo.newBuilder()
+ .setName("another-task")
+ .setTaskId(TaskID.newBuilder().setValue(Tasks.id(secondTask)))
+ .setAgentId(MESOS_OFFER.getAgentId())
+ .build();
+ expect(updateAgentReserver.getAgent(InstanceKeys.from(JOB, 9999))).andReturn(Optional.absent());
+ ImmutableSet<HostOffer> matchingOffers = ImmutableSet.of(OFFER);
+ expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false))
+ .andReturn(matchingOffers);
+ expect(updateAgentReserver.getReservations(OFFER.getOffer().getAgentId().getValue()))
+ .andReturn(ImmutableSet.of());
+ expectAssignTask(MESOS_OFFER, secondTask);
+ offerManager.launchTask(MESOS_OFFER.getId(), secondTaskInfo);
+ expect(taskFactory.createFrom(secondTask.getAssignedTask(), MESOS_OFFER, false))
+ .andReturn(secondTaskInfo);
+
+ control.replay();
+
+ assertEquals(
+ Tasks.ids(TASK, secondTask),
+ assigner.maybeAssign(
+ storeProvider,
+ resourceRequest,
+ GROUP_KEY,
+ ImmutableSet.of(
+ TASK.getAssignedTask(),
+ secondTask.getAssignedTask()),
+ ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+ assertNotEquals(empty(), aggregate);
+ }
+
+ private void expectAssignTask(Offer offer) {
+ expectAssignTask(offer, TASK);
+ }
+
+ private void expectAssignTask(Offer offer, IScheduledTask task) {
+ expect(stateManager.assignTask(
+ eq(storeProvider),
+ eq(Tasks.id(task)),
+ eq(offer.getHostname()),
+ eq(offer.getAgentId()),
+ anyObject())).andReturn(task.getAssignedTask());
+ }
+
+ private void expectNoUpdateReservations(int offers) {
+ expect(updateAgentReserver.hasReservations(anyObject())).andReturn(false);
+ for (int i = 0; i < offers; i++) {
+ expect(updateAgentReserver.getReservations(anyString())).andReturn(ImmutableSet.of());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
index 7a4525a..260ff9d 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
@@ -46,9 +46,7 @@ import org.apache.aurora.scheduler.preemptor.BiCache;
import org.apache.aurora.scheduler.preemptor.Preemptor;
import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.resources.ResourceManager;
-import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl;
import org.apache.aurora.scheduler.state.PubsubTestUtil;
-import org.apache.aurora.scheduler.state.TaskAssigner;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java b/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java
deleted file mode 100644
index 58f9de2..0000000
--- a/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java
+++ /dev/null
@@ -1,539 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.JobKey;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.TierManager;
-import org.apache.aurora.scheduler.base.InstanceKeys;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
-import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
-import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
-import org.apache.mesos.v1.Protos.AgentID;
-import org.apache.mesos.v1.Protos.FrameworkID;
-import org.apache.mesos.v1.Protos.OfferID;
-import org.apache.mesos.v1.Protos.TaskID;
-import org.apache.mesos.v1.Protos.TaskInfo;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.ScheduleStatus.LOST;
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
-import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources;
-import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
-import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
-import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
-import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
-import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
-import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
-import static org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner.ASSIGNER_EVALUATED_OFFERS;
-import static org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner.ASSIGNER_LAUNCH_FAILURES;
-import static org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner.LAUNCH_FAILED_MSG;
-import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import static org.apache.mesos.v1.Protos.Offer;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.anyString;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-public class FirstFitTaskAssignerTest extends EasyMockTest {
-
- private static final int PORT = 1000;
- private static final Offer MESOS_OFFER =
- offer(mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1024), mesosRange(PORTS, PORT));
- private static final String SLAVE_ID = MESOS_OFFER.getAgentId().getValue();
- private static final HostOffer OFFER =
- new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes()
- .setHost(MESOS_OFFER.getHostname())
- .setAttributes(ImmutableSet.of(
- new Attribute("host", ImmutableSet.of(MESOS_OFFER.getHostname()))))));
- private static final IScheduledTask TASK = makeTask("id", JOB);
- private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask());
- private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
- .setName("taskName")
- .setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK)))
- .setAgentId(MESOS_OFFER.getAgentId())
- .build();
- private static final IInstanceKey INSTANCE_KEY =
- InstanceKeys.from(JOB, TASK.getAssignedTask().getInstanceId());
- private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of();
- private static final UnusedResource UNUSED = new UnusedResource(
- bagFromMesosResources(MESOS_OFFER.getResourcesList()),
- OFFER.getAttributes());
- private static final HostOffer OFFER_2 = new HostOffer(
- Offer.newBuilder()
- .setId(OfferID.newBuilder().setValue("offerId0"))
- .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
- .setAgentId(AgentID.newBuilder().setValue("slaveId0"))
- .setHostname("hostName0")
- .addResources(mesosRange(PORTS, PORT))
- .addResources(mesosScalar(CPUS, 1))
- .addResources(mesosScalar(RAM_MB, 1024))
- .build(),
- IHostAttributes.build(new HostAttributes()));
-
- private static final Set<String> NO_ASSIGNMENT = ImmutableSet.of();
-
- private ResourceRequest resourceRequest;
-
- private MutableStoreProvider storeProvider;
- private StateManager stateManager;
- private SchedulingFilter filter;
- private MesosTaskFactory taskFactory;
- private OfferManager offerManager;
- private FirstFitTaskAssigner assigner;
- private TierManager tierManager;
- private FakeStatsProvider statsProvider;
- private UpdateAgentReserver updateAgentReserver;
-
- @Before
- public void setUp() throws Exception {
- storeProvider = createMock(MutableStoreProvider.class);
- filter = createMock(SchedulingFilter.class);
- taskFactory = createMock(MesosTaskFactory.class);
- stateManager = createMock(StateManager.class);
- offerManager = createMock(OfferManager.class);
- tierManager = createMock(TierManager.class);
- updateAgentReserver = createMock(UpdateAgentReserver.class);
- statsProvider = new FakeStatsProvider();
- assigner = new FirstFitTaskAssigner(
- stateManager,
- filter,
- taskFactory,
- offerManager,
- tierManager,
- updateAgentReserver,
- statsProvider);
- resourceRequest = new ResourceRequest(
- TASK.getAssignedTask().getTask(),
- ResourceBag.EMPTY,
- empty());
- }
-
- @Test
- public void testAssignNoTasks() throws Exception {
- control.replay();
-
- assertEquals(
- NO_ASSIGNMENT,
- assigner.maybeAssign(storeProvider, null, null, ImmutableSet.of(), null));
- }
-
- @Test
- public void testAssignPartialNoVetoes() throws Exception {
- expectNoUpdateReservations(1);
- expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
- offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
- expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
- expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
- expectAssignTask(MESOS_OFFER);
- expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
- .andReturn(TASK_INFO);
-
- control.replay();
-
- AttributeAggregate aggregate = empty();
- assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
- assertEquals(
- ImmutableSet.of(Tasks.id(TASK)),
- assigner.maybeAssign(
- storeProvider,
- new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate),
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
- ImmutableSet.of(
- TASK.getAssignedTask(),
- makeTask("id2", JOB).getAssignedTask(),
- makeTask("id3", JOB).getAssignedTask()),
- ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
- assertNotEquals(empty(), aggregate);
- assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
- }
-
- @Test
- public void testAssignVetoesWithStaticBan() throws Exception {
- expectNoUpdateReservations(1);
- expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
- offerManager.banOfferForTaskGroup(MESOS_OFFER.getId(), GROUP_KEY);
- expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
- expect(filter.filter(UNUSED, resourceRequest))
- .andReturn(ImmutableSet.of(Veto.constraintMismatch("denied")));
-
- control.replay();
-
- assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
- assertEquals(
- NO_ASSIGNMENT,
- assigner.maybeAssign(
- storeProvider,
- resourceRequest,
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
- ImmutableSet.of(TASK.getAssignedTask()),
- NO_RESERVATION));
- assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
- }
-
- @Test
- public void testAssignVetoesWithNoStaticBan() throws Exception {
- expectNoUpdateReservations(1);
- expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
- expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
- expect(filter.filter(UNUSED, resourceRequest))
- .andReturn(ImmutableSet.of(Veto.unsatisfiedLimit("limit")));
-
- control.replay();
-
- assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
- assertEquals(
- NO_ASSIGNMENT,
- assigner.maybeAssign(
- storeProvider,
- resourceRequest,
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
- ImmutableSet.of(TASK.getAssignedTask()),
- NO_RESERVATION));
- assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
- }
-
- @Test
- public void testAssignmentClearedOnError() throws Exception {
- expectNoUpdateReservations(1);
- expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER, OFFER_2));
- offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
- expectLastCall().andThrow(new OfferManager.LaunchException("expected"));
- expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
- expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
- expectAssignTask(MESOS_OFFER);
- expect(stateManager.changeState(
- storeProvider,
- Tasks.id(TASK),
- Optional.of(PENDING),
- LOST,
- LAUNCH_FAILED_MSG))
- .andReturn(StateChangeResult.SUCCESS);
- expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
- .andReturn(TASK_INFO);
-
- control.replay();
-
- assertEquals(0L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES));
- assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
- // Ensures scheduling loop terminates on the first launch failure.
- assertEquals(
- NO_ASSIGNMENT,
- assigner.maybeAssign(
- storeProvider,
- resourceRequest,
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
- ImmutableSet.of(
- TASK.getAssignedTask(),
- makeTask("id2", JOB).getAssignedTask(),
- makeTask("id3", JOB).getAssignedTask()),
- NO_RESERVATION));
- assertEquals(1L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES));
- assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
- }
-
- @Test
- public void testAssignmentSkippedForReservedSlave() throws Exception {
- expectNoUpdateReservations(0);
- expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
- expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
-
- control.replay();
-
- assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
- assertEquals(
- NO_ASSIGNMENT,
- assigner.maybeAssign(
- storeProvider,
- resourceRequest,
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
- ImmutableSet.of(TASK.getAssignedTask()),
- ImmutableMap.of(SLAVE_ID, TaskGroupKey.from(
- ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", "e", "n")))))));
- assertEquals(0, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
- }
-
- @Test
- public void testTaskWithReservedSlaveLandsElsewhere() throws Exception {
- // Ensures slave/task reservation relationship is only enforced in slave->task direction
- // and permissive in task->slave direction. In other words, a task with a slave reservation
- // should still be tried against other unreserved slaves.
- expectNoUpdateReservations(1);
- expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER_2, OFFER));
- expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
- expect(filter.filter(
- new UnusedResource(
- bagFromMesosResources(OFFER_2.getOffer().getResourcesList()),
- OFFER_2.getAttributes()),
- resourceRequest)).andReturn(ImmutableSet.of());
- expectAssignTask(OFFER_2.getOffer());
- expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER_2.getOffer(), false))
- .andReturn(TASK_INFO);
- offerManager.launchTask(OFFER_2.getOffer().getId(), TASK_INFO);
-
- control.replay();
-
- assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
- assertEquals(
- ImmutableSet.of(Tasks.id(TASK)),
- assigner.maybeAssign(
- storeProvider,
- resourceRequest,
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
- ImmutableSet.of(TASK.getAssignedTask()),
- ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
- assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
- }
-
- @Test
- public void testAssignerDoesNotReturnOnFirstMismatch() throws Exception {
- // Ensures scheduling loop does not terminate prematurely when the first mismatch is identified.
- HostOffer mismatched = new HostOffer(
- Offer.newBuilder()
- .setId(OfferID.newBuilder().setValue("offerId0"))
- .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
- .setAgentId(AgentID.newBuilder().setValue("slaveId0"))
- .setHostname("hostName0")
- .addResources(mesosRange(PORTS, PORT))
- .addResources(mesosScalar(CPUS, 1))
- .addResources(mesosScalar(RAM_MB, 1024))
- .build(),
- IHostAttributes.build(new HostAttributes()));
-
- expectNoUpdateReservations(2);
- expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(mismatched, OFFER));
- expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
- expect(filter.filter(
- new UnusedResource(
- bagFromMesosResources(mismatched.getOffer().getResourcesList()),
- mismatched.getAttributes()),
- resourceRequest))
- .andReturn(ImmutableSet.of(Veto.constraintMismatch("constraint mismatch")));
- offerManager.banOfferForTaskGroup(mismatched.getOffer().getId(), GROUP_KEY);
- expect(filter.filter(
- new UnusedResource(
- bagFromMesosResources(MESOS_OFFER.getResourcesList()), OFFER.getAttributes()),
- resourceRequest))
- .andReturn(ImmutableSet.of());
-
- expectAssignTask(MESOS_OFFER);
- expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER.getOffer(), false))
- .andReturn(TASK_INFO);
- offerManager.launchTask(OFFER.getOffer().getId(), TASK_INFO);
-
- control.replay();
-
- assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
- assertEquals(
- ImmutableSet.of(Tasks.id(TASK)),
- assigner.maybeAssign(
- storeProvider,
- resourceRequest,
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
- ImmutableSet.of(TASK.getAssignedTask()),
- ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
- assertEquals(2L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
- }
-
- @Test
- public void testResourceMapperCallback() {
- AssignedTask builder = TASK.newBuilder().getAssignedTask();
- builder.unsetAssignedPorts();
-
- control.replay();
-
- assertEquals(
- TASK.getAssignedTask(),
- assigner.mapAndAssignResources(MESOS_OFFER, IAssignedTask.build(builder)));
- }
-
- @Test
- public void testAssignToReservedAgent() throws Exception {
- expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
- expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
- updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
- expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER));
- expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
- expectAssignTask(MESOS_OFFER);
- offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
- expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-
- expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
- .andReturn(TASK_INFO);
-
- control.replay();
-
- AttributeAggregate aggregate = empty();
- assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
- assertEquals(
- ImmutableSet.of(Tasks.id(TASK)),
- assigner.maybeAssign(
- storeProvider,
- new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate),
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
- ImmutableSet.of(
- TASK.getAssignedTask()),
- ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
- assertNotEquals(empty(), aggregate);
- assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
- }
-
- @Test
- public void testAssignReservedAgentWhenOfferNotReady() throws Exception {
- expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
- expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
- expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER));
- expect(filter.filter(UNUSED, resourceRequest))
- .andReturn(ImmutableSet.of(Veto.insufficientResources("cpu", 1)));
- expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
- offerManager.banOfferForTaskGroup(MESOS_OFFER.getId(), GROUP_KEY);
- expectLastCall();
-
- control.replay();
-
- AttributeAggregate aggregate = empty();
- assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
- assertEquals(
- ImmutableSet.of(),
- assigner.maybeAssign(
- storeProvider,
- new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate),
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
- ImmutableSet.of(TASK.getAssignedTask()),
- ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
- assertEquals(empty(), aggregate);
- assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
- }
-
- @Test
- public void testAssignWithMixOfReservedAndNotReserved() throws Exception {
- AttributeAggregate aggregate = empty();
- ResourceRequest resources = new ResourceRequest(
- TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate);
- expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
- expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
- updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
- expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER));
- expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of());
- expectAssignTask(MESOS_OFFER);
- offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
- expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-
- expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
- .andReturn(TASK_INFO);
-
- // Normal scheduling loop for the remaining task...
- expect(updateAgentReserver.getAgent(InstanceKeys.from(JOB, 9999))).andReturn(Optional.absent());
- expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
- expect(updateAgentReserver.getReservations(OFFER.getOffer().getAgentId().getValue()))
- .andReturn(ImmutableSet.of());
- expect(filter.filter(UNUSED, resources))
- .andReturn(ImmutableSet.of(Veto.constraintMismatch("lol")));
- offerManager.banOfferForTaskGroup(MESOS_OFFER.getId(), GROUP_KEY);
-
- control.replay();
-
- assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
- assertEquals(
- ImmutableSet.of(Tasks.id(TASK)),
- assigner.maybeAssign(
- storeProvider,
- resources,
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
- ImmutableSet.of(
- TASK.getAssignedTask(),
- makeTask("another-task", JOB, 9999).getAssignedTask()),
- ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
- assertNotEquals(empty(), aggregate);
- assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
- }
-
- @Test
- public void testSkipsOffersWithNoMemAndNoCpu() {
- expectNoUpdateReservations(0);
- expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-
- // Offer lacks CPU.
- Offer mesosOffer = offer(mesosScalar(RAM_MB, 1024), mesosRange(PORTS, PORT));
- HostOffer offer = new HostOffer(mesosOffer, IHostAttributes.build(new HostAttributes()
- .setHost(mesosOffer.getHostname())
- .setAttributes(ImmutableSet.of(
- new Attribute("host", ImmutableSet.of(mesosOffer.getHostname()))))));
-
- expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(offer));
-
- control.replay();
-
- assertEquals(
- NO_ASSIGNMENT,
- assigner.maybeAssign(
- storeProvider,
- resourceRequest,
- TaskGroupKey.from(TASK.getAssignedTask().getTask()),
- ImmutableSet.of(TASK.getAssignedTask()),
- NO_RESERVATION));
- assertEquals(0, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
- }
-
- private void expectAssignTask(Offer offer) {
- expect(stateManager.assignTask(
- eq(storeProvider),
- eq(Tasks.id(TASK)),
- eq(offer.getHostname()),
- eq(offer.getAgentId()),
- anyObject())).andReturn(TASK.getAssignedTask());
- }
-
- private void expectNoUpdateReservations(int offers) {
- expect(updateAgentReserver.hasReservations(anyObject())).andReturn(false);
- for (int i = 0; i < offers; i++) {
- expect(updateAgentReserver.getReservations(anyString())).andReturn(ImmutableSet.of());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java b/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java
index f8e8023..dfcbb4a 100644
--- a/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java
@@ -45,7 +45,7 @@ public class AsyncStatsModuleTest extends EasyMockTest {
@Test
public void testOfferAdapter() {
OfferManager offerManager = createMock(OfferManager.class);
- expect(offerManager.getOffers()).andReturn(ImmutableList.of(
+ expect(offerManager.getAll()).andReturn(ImmutableList.of(
new HostOffer(Protos.Offer.newBuilder()
.setId(Protos.OfferID.newBuilder().setValue("offerId"))
.setFrameworkId(Protos.FrameworkID.newBuilder().setValue("frameworkId"))
[2/3] aurora git commit: Enable custom offer scoring modules for task
assignment
Posted by wf...@apache.org.
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelector.java b/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelector.java
new file mode 100644
index 0000000..ee65bab
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelector.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+
+public class FirstFitOfferSelector implements OfferSelector {
+
+ @Override
+ public Optional<HostOffer> select(Iterable<HostOffer> offers, ResourceRequest resourceRequest) {
+
+ return Optional.fromNullable(Iterables.getFirst(offers, null));
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorModule.java
new file mode 100644
index 0000000..4d36487
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorModule.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Singleton;
+
+public class FirstFitOfferSelectorModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ bind(OfferSelector.class).to(FirstFitOfferSelector.class);
+ bind(FirstFitOfferSelector.class).in(Singleton.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/OfferSelector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/OfferSelector.java b/src/main/java/org/apache/aurora/scheduler/scheduling/OfferSelector.java
new file mode 100644
index 0000000..c95b980
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/OfferSelector.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import com.google.common.base.Optional;
+
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+
+/**
+ * Injected into {@link TaskAssignerImpl}, this class scores the offers available and returns an
+ * option containing the offer to use.
+ */
+public interface OfferSelector {
+
+ /**
+ * Score offers that fit within the given {@link ResourceRequest} and return an option containing
+ * the offer to use for assignment.
+ *
+ * @param offers A stream of offers that match the given {@link ResourceRequest}.
+ * @param resourceRequest The {@link ResourceRequest} for the task to assign.
+ * @return An {@link Optional} containing the offer to use.
+ */
+ Optional<HostOffer> select(Iterable<HostOffer> offers, ResourceRequest resourceRequest);
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
index 0796712..f72dacd 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
@@ -139,8 +139,8 @@ public class SchedulingModule extends AbstractModule {
bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).in(Singleton.class);
bind(BiCache.BiCacheSettings.class).toInstance(
new BiCache.BiCacheSettings(options.reservationDuration, "reservation"));
- bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class);
- bind(TaskScheduler.TaskSchedulerImpl.class).in(Singleton.class);
+ bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
+ bind(TaskSchedulerImpl.class).in(Singleton.class);
expose(TaskScheduler.class);
}
});
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
new file mode 100644
index 0000000..87619b5
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssigner.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+
+import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+
+/**
+ * Responsible for matching a task against an offer and launching it.
+ */
+public interface TaskAssigner {
+ /**
+ * Tries to match a task against an offer. If a match is found, the assigner makes the
+ * appropriate changes to the task and requests task launch.
+ *
+ * @param storeProvider Storage provider.
+ * @param resourceRequest The request for resources being scheduled.
+ * @param groupKey Task group key.
+ * @param tasks Tasks to assign.
+ * @param preemptionReservations Slave reservations.
+ * @return Successfully assigned task IDs.
+ */
+ Set<String> maybeAssign(
+ MutableStoreProvider storeProvider,
+ ResourceRequest resourceRequest,
+ TaskGroupKey groupKey,
+ Iterable<IAssignedTask> tasks,
+ Map<String, TaskGroupKey> preemptionReservations);
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
new file mode 100644
index 0000000..a1dd74f
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImpl.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.base.InstanceKeys;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.resources.ResourceManager;
+import org.apache.aurora.scheduler.resources.ResourceType;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
+import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
+import org.apache.mesos.v1.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+
+public class TaskAssignerImpl implements TaskAssigner {
+ private static final Logger LOG = LoggerFactory.getLogger(TaskAssignerImpl.class);
+
+ @VisibleForTesting
+ static final Optional<String> LAUNCH_FAILED_MSG =
+ Optional.of("Unknown exception attempting to schedule task.");
+ @VisibleForTesting
+ static final String ASSIGNER_LAUNCH_FAILURES = "assigner_launch_failures";
+
+ private final AtomicLong launchFailures;
+
+ private final StateManager stateManager;
+ private final MesosTaskFactory taskFactory;
+ private final OfferManager offerManager;
+ private final TierManager tierManager;
+ private final UpdateAgentReserver updateAgentReserver;
+ private final OfferSelector offerSelector;
+
+ @Inject
+ public TaskAssignerImpl(
+ StateManager stateManager,
+ MesosTaskFactory taskFactory,
+ OfferManager offerManager,
+ TierManager tierManager,
+ UpdateAgentReserver updateAgentReserver,
+ StatsProvider statsProvider,
+ OfferSelector offerSelector) {
+
+ this.stateManager = requireNonNull(stateManager);
+ this.taskFactory = requireNonNull(taskFactory);
+ this.offerManager = requireNonNull(offerManager);
+ this.tierManager = requireNonNull(tierManager);
+ this.launchFailures = statsProvider.makeCounter(ASSIGNER_LAUNCH_FAILURES);
+ this.updateAgentReserver = requireNonNull(updateAgentReserver);
+ this.offerSelector = requireNonNull(offerSelector);
+ }
+
+ @VisibleForTesting
+ IAssignedTask mapAndAssignResources(Protos.Offer offer, IAssignedTask task) {
+ IAssignedTask assigned = task;
+ for (ResourceType type : ResourceManager.getTaskResourceTypes(assigned)) {
+ if (type.getMapper().isPresent()) {
+ assigned = type.getMapper().get().mapAndAssign(offer, assigned);
+ }
+ }
+ return assigned;
+ }
+
+ private Protos.TaskInfo assign(
+ Storage.MutableStoreProvider storeProvider,
+ Protos.Offer offer,
+ String taskId,
+ boolean revocable) {
+
+ String host = offer.getHostname();
+ IAssignedTask assigned = stateManager.assignTask(
+ storeProvider,
+ taskId,
+ host,
+ offer.getAgentId(),
+ task -> mapAndAssignResources(offer, task));
+ LOG.info(
+ "Offer on agent {} (id {}) is being assigned task for {}.",
+ host, offer.getAgentId().getValue(), taskId);
+ return taskFactory.createFrom(assigned, offer, revocable);
+ }
+
+ private void launchUsingOffer(
+ Storage.MutableStoreProvider storeProvider,
+ boolean revocable,
+ ResourceRequest resourceRequest,
+ IAssignedTask task,
+ HostOffer offer,
+ ImmutableSet.Builder<String> assignmentResult) throws OfferManager.LaunchException {
+
+ String taskId = task.getTaskId();
+ Protos.TaskInfo taskInfo = assign(storeProvider, offer.getOffer(), taskId, revocable);
+ resourceRequest.getJobState().updateAttributeAggregate(offer.getAttributes());
+ try {
+ offerManager.launchTask(offer.getOffer().getId(), taskInfo);
+ assignmentResult.add(taskId);
+ } catch (OfferManager.LaunchException e) {
+ LOG.warn("Failed to launch task.", e);
+ launchFailures.incrementAndGet();
+
+ // The attempt to schedule the task failed, so we need to backpedal on the assignment.
+ // It is in the LOST state and a new task will move to PENDING to replace it.
+ // Should the state change fail due to storage issues, that's okay. The task will
+ // time out in the ASSIGNED state and be moved to LOST.
+ stateManager.changeState(
+ storeProvider,
+ taskId,
+ Optional.of(PENDING),
+ LOST,
+ LAUNCH_FAILED_MSG);
+ throw e;
+ }
+ }
+
+ private Iterable<IAssignedTask> maybeAssignReserved(
+ Iterable<IAssignedTask> tasks,
+ Storage.MutableStoreProvider storeProvider,
+ boolean revocable,
+ ResourceRequest resourceRequest,
+ TaskGroupKey groupKey,
+ ImmutableSet.Builder<String> assignmentResult) {
+
+ if (!updateAgentReserver.hasReservations(groupKey)) {
+ return tasks;
+ }
+
+ // Data structure to record which tasks should be excluded from the regular (non-reserved)
+ // scheduling loop. This is important because we release reservations once they are used,
+ // so we need to record them separately to avoid them being double-scheduled.
+ ImmutableSet.Builder<IInstanceKey> excludeBuilder = ImmutableSet.builder();
+
+ for (IAssignedTask task : tasks) {
+ IInstanceKey key = InstanceKeys.from(task.getTask().getJob(), task.getInstanceId());
+ Optional<String> maybeAgentId = updateAgentReserver.getAgent(key);
+ if (maybeAgentId.isPresent()) {
+ excludeBuilder.add(key);
+ Optional<HostOffer> offer = offerManager.getMatching(
+ Protos.AgentID.newBuilder().setValue(maybeAgentId.get()).build(),
+ resourceRequest,
+ revocable);
+ if (offer.isPresent()) {
+ try {
+ // The offer can still be veto'd because of changed constraints, or because the
+ // Scheduler hasn't been updated by Mesos yet...
+ launchUsingOffer(storeProvider,
+ revocable,
+ resourceRequest,
+ task,
+ offer.get(),
+ assignmentResult);
+ LOG.info("Used update reservation for {} on {}", key, maybeAgentId.get());
+ updateAgentReserver.release(maybeAgentId.get(), key);
+ } catch (OfferManager.LaunchException e) {
+ updateAgentReserver.release(maybeAgentId.get(), key);
+ }
+ } else {
+ LOG.info(
+ "Tried to reuse offer on {} for {}, but was not ready yet.",
+ maybeAgentId.get(),
+ key);
+ }
+ }
+ }
+
+ // Return only the tasks that didn't have reservations. Offers on agents that were reserved
+ // might not have been seen by Aurora yet, so we need to wait until the reservation expires
+ // before giving up and falling back to the first-fit algorithm.
+ Set<IInstanceKey> toBeExcluded = excludeBuilder.build();
+ return Iterables.filter(tasks, t -> !toBeExcluded.contains(
+ InstanceKeys.from(t.getTask().getJob(), t.getInstanceId())));
+ }
+
+ /**
+ * Determine whether or not the offer is reserved for a different task via preemption or
+ * update affinity.
+ */
+ @SuppressWarnings("PMD.UselessParentheses") // TODO(jly): PMD bug, remove when upgrade from 5.5.3
+ private boolean isAgentReserved(HostOffer offer,
+ TaskGroupKey groupKey,
+ Map<String, TaskGroupKey> preemptionReservations) {
+
+ String agentId = offer.getOffer().getAgentId().getValue();
+ Optional<TaskGroupKey> reservedGroup = Optional.fromNullable(
+ preemptionReservations.get(agentId));
+
+ return (reservedGroup.isPresent() && !reservedGroup.get().equals(groupKey))
+ || !updateAgentReserver.getReservations(agentId).isEmpty();
+ }
+
+ @Timed("assigner_maybe_assign")
+ @Override
+ public Set<String> maybeAssign(
+ Storage.MutableStoreProvider storeProvider,
+ ResourceRequest resourceRequest,
+ TaskGroupKey groupKey,
+ Iterable<IAssignedTask> tasks,
+ Map<String, TaskGroupKey> preemptionReservations) {
+
+ if (Iterables.isEmpty(tasks)) {
+ return ImmutableSet.of();
+ }
+
+ boolean revocable = tierManager.getTier(groupKey.getTask()).isRevocable();
+ ImmutableSet.Builder<String> assignmentResult = ImmutableSet.builder();
+
+ // Assign tasks reserved for a specific agent (e.g. for update affinity)
+ Iterable<IAssignedTask> nonReservedTasks = maybeAssignReserved(
+ tasks,
+ storeProvider,
+ revocable,
+ resourceRequest,
+ groupKey,
+ assignmentResult);
+
+ // Assign the rest of the non-reserved tasks
+ for (IAssignedTask task : nonReservedTasks) {
+ try {
+ // Get all offers that will satisfy the given ResourceRequest and that are not reserved
+ // for updates or preemption
+ FluentIterable<HostOffer> matchingOffers = FluentIterable
+ .from(offerManager.getAllMatching(groupKey, resourceRequest, revocable))
+ .filter(o -> !isAgentReserved(o, groupKey, preemptionReservations));
+
+ // Determine which is the optimal offer to select for the given request
+ Optional<HostOffer> optionalOffer = offerSelector.select(matchingOffers, resourceRequest);
+
+ // If no offer is chosen, continue to the next task
+ if (!optionalOffer.isPresent()) {
+ continue;
+ }
+
+ // Attempt to launch the task using the chosen offer
+ HostOffer offer = optionalOffer.get();
+ launchUsingOffer(storeProvider,
+ revocable,
+ resourceRequest,
+ task,
+ offer,
+ assignmentResult);
+ } catch (OfferManager.LaunchException e) {
+ // Any launch exception causes the scheduling round to terminate for this TaskGroup.
+ break;
+ }
+ }
+
+ return assignmentResult.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplModule.java
new file mode 100644
index 0000000..2ddd4f5
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplModule.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.util.List;
+import javax.inject.Singleton;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.AbstractModule;
+import com.google.inject.Module;
+
+import org.apache.aurora.scheduler.app.MoreModules;
+import org.apache.aurora.scheduler.config.CliOptions;
+
+/**
+ * The default TaskAssigner implementation that allows the injection of custom offer
+ * selecting modules via the '-offer_selector_modules' flag.
+ */
+public class TaskAssignerImplModule extends AbstractModule {
+
+ @Parameters(separators = "=")
+ public static class Options {
+ @Parameter(names = "-offer_selector_modules",
+ description = "Guice module for customizing the TaskAssignerImpl's OfferSelector.")
+ @SuppressWarnings("rawtypes")
+ public List<Class> offerSelectorModules =
+ ImmutableList.of(FirstFitOfferSelectorModule.class);
+ }
+
+ private final CliOptions cliOptions;
+
+ public TaskAssignerImplModule(CliOptions cliOptions) {
+ this.cliOptions = cliOptions;
+ }
+
+ @Override
+ protected void configure() {
+ Options options = cliOptions.taskAssigner;
+ for (Module module : MoreModules.instantiateAll(options.offerSelectorModules, cliOptions)) {
+ install(module);
+ }
+
+ bind(TaskAssigner.class).to(TaskAssignerImpl.class);
+ bind(TaskAssignerImpl.class).in(Singleton.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
index 0002b0c..3c38f95 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
@@ -13,55 +13,10 @@
*/
package org.apache.aurora.scheduler.scheduling;
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-import javax.inject.Inject;
-import javax.inject.Qualifier;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-import com.google.common.eventbus.Subscribe;
-
-import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.stats.Stats;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.preemptor.BiCache;
-import org.apache.aurora.scheduler.preemptor.Preemptor;
-import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.state.TaskAssigner;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-import static java.util.Objects.requireNonNull;
-
-import static java.util.stream.Collectors.toMap;
-
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources;
/**
* Enables scheduling and preemption of tasks.
@@ -77,150 +32,4 @@ public interface TaskScheduler extends EventSubscriber {
* task ID was not present in the result.
*/
Set<String> schedule(MutableStoreProvider storeProvider, Iterable<String> taskIds);
-
- /**
- * An asynchronous task scheduler. Scheduling of tasks is performed on a delay, where each task
- * backs off after a failed scheduling attempt.
- * <p>
- * Pending tasks are advertised to the scheduler via internal pubsub notifications.
- */
- class TaskSchedulerImpl implements TaskScheduler {
- /**
- * Binding annotation for the time duration of reservations.
- */
- @VisibleForTesting
- @Qualifier
- @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
- public @interface ReservationDuration { }
-
- private static final Logger LOG = LoggerFactory.getLogger(TaskSchedulerImpl.class);
-
- private final TaskAssigner assigner;
- private final Preemptor preemptor;
- private final ExecutorSettings executorSettings;
- private final BiCache<String, TaskGroupKey> reservations;
-
- private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired");
- private final AtomicLong attemptsFailed = Stats.exportLong("schedule_attempts_failed");
- private final AtomicLong attemptsNoMatch = Stats.exportLong("schedule_attempts_no_match");
-
- @Inject
- TaskSchedulerImpl(
- TaskAssigner assigner,
- Preemptor preemptor,
- ExecutorSettings executorSettings,
- BiCache<String, TaskGroupKey> reservations) {
-
- this.assigner = requireNonNull(assigner);
- this.preemptor = requireNonNull(preemptor);
- this.executorSettings = requireNonNull(executorSettings);
- this.reservations = requireNonNull(reservations);
- }
-
- @Timed ("task_schedule_attempt")
- public Set<String> schedule(MutableStoreProvider store, Iterable<String> taskIds) {
- try {
- return scheduleTasks(store, taskIds);
- } catch (RuntimeException e) {
- // We catch the generic unchecked exception here to ensure tasks are not abandoned
- // if there is a transient issue resulting in an unchecked exception.
- LOG.warn("Task scheduling unexpectedly failed, will be retried", e);
- attemptsFailed.incrementAndGet();
- // Return empty set for all task IDs to be retried later.
- // It's ok if some tasks were already assigned, those will be ignored in the next round.
- return ImmutableSet.of();
- }
- }
-
- private Set<String> scheduleTasks(MutableStoreProvider store, Iterable<String> tasks) {
- ImmutableSet<String> taskIds = ImmutableSet.copyOf(tasks);
- String taskIdValues = Joiner.on(",").join(taskIds);
- LOG.debug("Attempting to schedule tasks {}", taskIdValues);
- ImmutableSet<IAssignedTask> assignedTasks =
- ImmutableSet.copyOf(Iterables.transform(
- store.getTaskStore().fetchTasks(Query.taskScoped(taskIds).byStatus(PENDING)),
- IScheduledTask::getAssignedTask));
-
- if (Iterables.isEmpty(assignedTasks)) {
- LOG.warn("Failed to look up all tasks in a scheduling round: {}", taskIdValues);
- return taskIds;
- }
-
- Preconditions.checkState(
- assignedTasks.stream()
- .collect(Collectors.groupingBy(t -> t.getTask()))
- .entrySet()
- .size() == 1,
- "Found multiple task groups for %s",
- taskIdValues);
-
- Map<String, IAssignedTask> assignableTaskMap =
- assignedTasks.stream().collect(toMap(t -> t.getTaskId(), t -> t));
-
- if (taskIds.size() != assignedTasks.size()) {
- LOG.warn("Failed to look up tasks "
- + Joiner.on(", ").join(Sets.difference(taskIds, assignableTaskMap.keySet())));
- }
-
- // This is safe after all checks above.
- ITaskConfig task = assignedTasks.stream().findFirst().get().getTask();
- AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob());
-
- // Valid Docker tasks can have a container but no executor config
- ResourceBag overhead = ResourceBag.EMPTY;
- if (task.isSetExecutorConfig()) {
- overhead = executorSettings.getExecutorOverhead(task.getExecutorConfig().getName())
- .orElseThrow(
- () -> new IllegalArgumentException("Cannot find executor configuration"));
- }
-
- Set<String> launched = assigner.maybeAssign(
- store,
- new ResourceRequest(
- task,
- bagFromResources(task.getResources()).add(overhead), aggregate),
- TaskGroupKey.from(task),
- assignedTasks,
- reservations.asMap());
-
- attemptsFired.addAndGet(assignableTaskMap.size());
- Set<String> failedToLaunch = Sets.difference(assignableTaskMap.keySet(), launched);
-
- failedToLaunch.forEach(taskId -> {
- // Task could not be scheduled.
- // TODO(maxim): Now that preemption slots are searched asynchronously, consider
- // retrying a launch attempt within the current scheduling round IFF a reservation is
- // available.
- maybePreemptFor(assignableTaskMap.get(taskId), aggregate, store);
- });
- attemptsNoMatch.addAndGet(failedToLaunch.size());
-
- // Return all successfully launched tasks as well as those weren't tried (not in PENDING).
- return Sets.union(launched, Sets.difference(taskIds, assignableTaskMap.keySet()));
- }
-
- private void maybePreemptFor(
- IAssignedTask task,
- AttributeAggregate jobState,
- MutableStoreProvider storeProvider) {
-
- if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) {
- return;
- }
- Optional<String> slaveId = preemptor.attemptPreemptionFor(task, jobState, storeProvider);
- if (slaveId.isPresent()) {
- reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask()));
- }
- }
-
- @Subscribe
- public void taskChanged(final TaskStateChange stateChangeEvent) {
- if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) {
- IAssignedTask assigned = stateChangeEvent.getTask().getAssignedTask();
- if (assigned.getSlaveId() != null) {
- reservations.remove(assigned.getSlaveId(), TaskGroupKey.from(assigned.getTask()));
- }
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
new file mode 100644
index 0000000..b6d5d95
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import javax.inject.Inject;
+import javax.inject.Qualifier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.eventbus.Subscribe;
+
+import org.apache.aurora.common.inject.TimedInterceptor.Timed;
+import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.configuration.executor.ExecutorSettings;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.preemptor.BiCache;
+import org.apache.aurora.scheduler.preemptor.Preemptor;
+import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toMap;
+
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromResources;
+
+/**
+ * An asynchronous task scheduler. Scheduling of tasks is performed on a delay, where each task
+ * backs off after a failed scheduling attempt.
+ * <p>
+ * Pending tasks are advertised to the scheduler via internal pubsub notifications.
+ */
+public class TaskSchedulerImpl implements TaskScheduler {
+ /**
+ * Binding annotation for the time duration of reservations.
+ */
+ @VisibleForTesting
+ @Qualifier
+ @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+ public @interface ReservationDuration { }
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(org.apache.aurora.scheduler.scheduling.TaskSchedulerImpl.class);
+
+ private final TaskAssigner assigner;
+ private final Preemptor preemptor;
+ private final ExecutorSettings executorSettings;
+ private final BiCache<String, TaskGroupKey> reservations;
+
+ private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired");
+ private final AtomicLong attemptsFailed = Stats.exportLong("schedule_attempts_failed");
+ private final AtomicLong attemptsNoMatch = Stats.exportLong("schedule_attempts_no_match");
+
+ @Inject
+ TaskSchedulerImpl(
+ TaskAssigner assigner,
+ Preemptor preemptor,
+ ExecutorSettings executorSettings,
+ BiCache<String, TaskGroupKey> reservations) {
+
+ this.assigner = requireNonNull(assigner);
+ this.preemptor = requireNonNull(preemptor);
+ this.executorSettings = requireNonNull(executorSettings);
+ this.reservations = requireNonNull(reservations);
+ }
+
+ @Timed("task_schedule_attempt")
+ public Set<String> schedule(Storage.MutableStoreProvider store, Iterable<String> taskIds) {
+ try {
+ return scheduleTasks(store, taskIds);
+ } catch (RuntimeException e) {
+ // We catch the generic unchecked exception here to ensure tasks are not abandoned
+ // if there is a transient issue resulting in an unchecked exception.
+ LOG.warn("Task scheduling unexpectedly failed, will be retried", e);
+ attemptsFailed.incrementAndGet();
+ // Return empty set for all task IDs to be retried later.
+ // It's ok if some tasks were already assigned, those will be ignored in the next round.
+ return ImmutableSet.of();
+ }
+ }
+
+ private Set<String> scheduleTasks(Storage.MutableStoreProvider store, Iterable<String> tasks) {
+ ImmutableSet<String> taskIds = ImmutableSet.copyOf(tasks);
+ String taskIdValues = Joiner.on(",").join(taskIds);
+ LOG.debug("Attempting to schedule tasks {}", taskIdValues);
+ ImmutableSet<IAssignedTask> assignedTasks =
+ ImmutableSet.copyOf(Iterables.transform(
+ store.getTaskStore().fetchTasks(Query.taskScoped(taskIds).byStatus(PENDING)),
+ IScheduledTask::getAssignedTask));
+
+ if (Iterables.isEmpty(assignedTasks)) {
+ LOG.warn("Failed to look up all tasks in a scheduling round: {}", taskIdValues);
+ return taskIds;
+ }
+
+ Preconditions.checkState(
+ assignedTasks.stream()
+ .collect(Collectors.groupingBy(t -> t.getTask()))
+ .entrySet()
+ .size() == 1,
+ "Found multiple task groups for %s",
+ taskIdValues);
+
+ Map<String, IAssignedTask> assignableTaskMap =
+ assignedTasks.stream().collect(toMap(t -> t.getTaskId(), t -> t));
+
+ if (taskIds.size() != assignedTasks.size()) {
+ LOG.warn("Failed to look up tasks "
+ + Joiner.on(", ").join(Sets.difference(taskIds, assignableTaskMap.keySet())));
+ }
+
+ // This is safe after all checks above.
+ ITaskConfig task = assignedTasks.stream().findFirst().get().getTask();
+ AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob());
+
+ // Valid Docker tasks can have a container but no executor config
+ ResourceBag overhead = ResourceBag.EMPTY;
+ if (task.isSetExecutorConfig()) {
+ overhead = executorSettings.getExecutorOverhead(task.getExecutorConfig().getName())
+ .orElseThrow(
+ () -> new IllegalArgumentException("Cannot find executor configuration"));
+ }
+
+ Set<String> launched = assigner.maybeAssign(
+ store,
+ new SchedulingFilter.ResourceRequest(
+ task,
+ bagFromResources(task.getResources()).add(overhead), aggregate),
+ TaskGroupKey.from(task),
+ assignedTasks,
+ reservations.asMap());
+
+ attemptsFired.addAndGet(assignableTaskMap.size());
+ Set<String> failedToLaunch = Sets.difference(assignableTaskMap.keySet(), launched);
+
+ failedToLaunch.forEach(taskId -> {
+ // Task could not be scheduled.
+ // TODO(maxim): Now that preemption slots are searched asynchronously, consider
+ // retrying a launch attempt within the current scheduling round IFF a reservation is
+ // available.
+ maybePreemptFor(assignableTaskMap.get(taskId), aggregate, store);
+ });
+ attemptsNoMatch.addAndGet(failedToLaunch.size());
+
+ // Return all successfully launched tasks as well as those weren't tried (not in PENDING).
+ return Sets.union(launched, Sets.difference(taskIds, assignableTaskMap.keySet()));
+ }
+
+ private void maybePreemptFor(
+ IAssignedTask task,
+ AttributeAggregate jobState,
+ Storage.MutableStoreProvider storeProvider) {
+
+ if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) {
+ return;
+ }
+ Optional<String> slaveId = preemptor.attemptPreemptionFor(task, jobState, storeProvider);
+ if (slaveId.isPresent()) {
+ reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask()));
+ }
+ }
+
+ @Subscribe
+ public void taskChanged(final PubsubEvent.TaskStateChange stateChangeEvent) {
+ if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) {
+ IAssignedTask assigned = stateChangeEvent.getTask().getAssignedTask();
+ if (assigned.getSlaveId() != null) {
+ reservations.remove(assigned.getSlaveId(), TaskGroupKey.from(assigned.getTask()));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java b/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java
deleted file mode 100644
index dc244ee..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerModule.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import javax.inject.Singleton;
-
-import com.google.inject.AbstractModule;
-
-import org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner;
-
-/**
- * Exposes the default TaskAssigner implementation, which is a first-fit scheduling algorithm.
- */
-public class FirstFitTaskAssignerModule extends AbstractModule {
- @Override
- protected void configure() {
- bind(TaskAssigner.class).to(FirstFitTaskAssigner.class);
- bind(FirstFitTaskAssigner.class).in(Singleton.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
index b7a3c0b..46e9227 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/StateModule.java
@@ -29,6 +29,7 @@ import org.apache.aurora.scheduler.config.CliOptions;
import org.apache.aurora.scheduler.events.PubsubEventModule;
import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
import org.apache.aurora.scheduler.mesos.MesosTaskFactory.MesosTaskFactoryImpl;
+import org.apache.aurora.scheduler.scheduling.TaskAssignerImplModule;
import org.apache.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl;
import org.apache.aurora.scheduler.state.UUIDGenerator.UUIDGeneratorImpl;
@@ -42,7 +43,7 @@ public class StateModule extends AbstractModule {
@Parameter(names = "-task_assigner_modules",
description = "Guice modules for customizing task assignment.")
@SuppressWarnings("rawtypes")
- public List<Class> taskAssignerModules = ImmutableList.of(FirstFitTaskAssignerModule.class);
+ public List<Class> taskAssignerModules = ImmutableList.of(TaskAssignerImplModule.class);
}
private final CliOptions options;
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
deleted file mode 100644
index cdd0d15..0000000
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.stats.StatsProvider;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.TierManager;
-import org.apache.aurora.scheduler.base.InstanceKeys;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.VetoGroup;
-import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
-import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.resources.ResourceManager;
-import org.apache.aurora.scheduler.resources.ResourceType;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
-import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
-import org.apache.mesos.v1.Protos;
-import org.apache.mesos.v1.Protos.TaskInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.gen.ScheduleStatus.LOST;
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import static org.apache.mesos.v1.Protos.Offer;
-
-/**
- * Responsible for matching a task against an offer and launching it.
- */
-public interface TaskAssigner {
- /**
- * Tries to match a task against an offer. If a match is found, the assigner makes the
- * appropriate changes to the task and requests task launch.
- *
- * @param storeProvider Storage provider.
- * @param resourceRequest The request for resources being scheduled.
- * @param groupKey Task group key.
- * @param tasks Tasks to assign.
- * @param preemptionReservations Slave reservations.
- * @return Successfully assigned task IDs.
- */
- Set<String> maybeAssign(
- MutableStoreProvider storeProvider,
- ResourceRequest resourceRequest,
- TaskGroupKey groupKey,
- Iterable<IAssignedTask> tasks,
- Map<String, TaskGroupKey> preemptionReservations);
-
- class FirstFitTaskAssigner implements TaskAssigner {
- private static final Logger LOG = LoggerFactory.getLogger(FirstFitTaskAssigner.class);
-
- @VisibleForTesting
- static final Optional<String> LAUNCH_FAILED_MSG =
- Optional.of("Unknown exception attempting to schedule task.");
- @VisibleForTesting
- static final String ASSIGNER_LAUNCH_FAILURES = "assigner_launch_failures";
- @VisibleForTesting
- static final String ASSIGNER_EVALUATED_OFFERS = "assigner_evaluated_offers";
-
- private final AtomicLong launchFailures;
- private final AtomicLong evaluatedOffers;
-
- private final StateManager stateManager;
- private final SchedulingFilter filter;
- private final MesosTaskFactory taskFactory;
- private final OfferManager offerManager;
- private final TierManager tierManager;
- private final UpdateAgentReserver updateAgentReserver;
-
- @Inject
- public FirstFitTaskAssigner(
- StateManager stateManager,
- SchedulingFilter filter,
- MesosTaskFactory taskFactory,
- OfferManager offerManager,
- TierManager tierManager,
- UpdateAgentReserver updateAgentReserver,
- StatsProvider statsProvider) {
-
- this.stateManager = requireNonNull(stateManager);
- this.filter = requireNonNull(filter);
- this.taskFactory = requireNonNull(taskFactory);
- this.offerManager = requireNonNull(offerManager);
- this.tierManager = requireNonNull(tierManager);
- this.launchFailures = statsProvider.makeCounter(ASSIGNER_LAUNCH_FAILURES);
- this.evaluatedOffers = statsProvider.makeCounter(ASSIGNER_EVALUATED_OFFERS);
- this.updateAgentReserver = requireNonNull(updateAgentReserver);
- }
-
- @VisibleForTesting
- IAssignedTask mapAndAssignResources(Offer offer, IAssignedTask task) {
- IAssignedTask assigned = task;
- for (ResourceType type : ResourceManager.getTaskResourceTypes(assigned)) {
- if (type.getMapper().isPresent()) {
- assigned = type.getMapper().get().mapAndAssign(offer, assigned);
- }
- }
- return assigned;
- }
-
- private TaskInfo assign(
- MutableStoreProvider storeProvider,
- Offer offer,
- String taskId,
- boolean revocable) {
-
- String host = offer.getHostname();
- IAssignedTask assigned = stateManager.assignTask(
- storeProvider,
- taskId,
- host,
- offer.getAgentId(),
- task -> mapAndAssignResources(offer, task));
- LOG.info(
- "Offer on agent {} (id {}) is being assigned task for {}.",
- host, offer.getAgentId().getValue(), taskId);
- return taskFactory.createFrom(assigned, offer, revocable);
- }
-
- private boolean evaluateOffer(
- MutableStoreProvider storeProvider,
- boolean revocable,
- ResourceRequest resourceRequest,
- TaskGroupKey groupKey,
- IAssignedTask task,
- HostOffer offer,
- ImmutableSet.Builder<String> assignmentResult) throws OfferManager.LaunchException {
-
- String taskId = task.getTaskId();
- Set<Veto> vetoes = filter.filter(
- new UnusedResource(
- offer.getResourceBag(revocable),
- offer.getAttributes(),
- offer.getUnavailabilityStart()),
- resourceRequest);
-
- if (vetoes.isEmpty()) {
- TaskInfo taskInfo = assign(
- storeProvider,
- offer.getOffer(),
- taskId,
- revocable);
- resourceRequest.getJobState().updateAttributeAggregate(offer.getAttributes());
-
- try {
- offerManager.launchTask(offer.getOffer().getId(), taskInfo);
- assignmentResult.add(taskId);
- return true;
- } catch (OfferManager.LaunchException e) {
- LOG.warn("Failed to launch task.", e);
- launchFailures.incrementAndGet();
-
- // The attempt to schedule the task failed, so we need to backpedal on the
- // assignment.
- // It is in the LOST state and a new task will move to PENDING to replace it.
- // Should the state change fail due to storage issues, that's okay. The task will
- // time out in the ASSIGNED state and be moved to LOST.
- stateManager.changeState(
- storeProvider,
- taskId,
- Optional.of(PENDING),
- LOST,
- LAUNCH_FAILED_MSG);
- throw e;
- }
- } else {
- if (Veto.identifyGroup(vetoes) == VetoGroup.STATIC) {
- // Never attempt to match this offer/groupKey pair again.
- offerManager.banOfferForTaskGroup(offer.getOffer().getId(), groupKey);
- }
- LOG.debug("Agent {} vetoed task {}: {}", offer.getOffer().getHostname(), taskId, vetoes);
- }
- return false;
- }
-
- private Iterable<IAssignedTask> maybeAssignReserved(
- Iterable<IAssignedTask> tasks,
- MutableStoreProvider storeProvider,
- boolean revocable,
- ResourceRequest resourceRequest,
- TaskGroupKey groupKey,
- ImmutableSet.Builder<String> assignmentResult) {
-
- if (!updateAgentReserver.hasReservations(groupKey)) {
- return tasks;
- }
-
- // Data structure to record which tasks should be excluded from the regular (non-reserved)
- // scheduling loop. This is important because we release reservations once they are used,
- // so we need to record them separately to avoid them being double-scheduled.
- ImmutableSet.Builder<IInstanceKey> excludeBuilder = ImmutableSet.builder();
-
- for (IAssignedTask task: tasks) {
- IInstanceKey key = InstanceKeys.from(task.getTask().getJob(), task.getInstanceId());
- Optional<String> maybeAgentId = updateAgentReserver.getAgent(key);
- if (maybeAgentId.isPresent()) {
- excludeBuilder.add(key);
- Optional<HostOffer> offer = offerManager.getOffer(
- Protos.AgentID.newBuilder().setValue(maybeAgentId.get()).build());
- if (offer.isPresent()) {
- try {
- // The offer can still be veto'd because of changed constraints, or because the
- // Scheduler hasn't been updated by Mesos yet...
- if (evaluateOffer(
- storeProvider,
- revocable,
- resourceRequest,
- groupKey,
- task,
- offer.get(),
- assignmentResult)) {
-
- LOG.info("Used update reservation for {} on {}", key, maybeAgentId.get());
- updateAgentReserver.release(maybeAgentId.get(), key);
- } else {
- LOG.info(
- "Tried to reuse offer on {} for {}, but was not ready yet.",
- maybeAgentId.get(),
- key);
- }
- } catch (OfferManager.LaunchException e) {
- updateAgentReserver.release(maybeAgentId.get(), key);
- }
- }
- }
- }
-
- // Return only the tasks that didn't have reservations. Offers on agents that were reserved
- // might not have been seen by Aurora yet, so we need to wait until the reservation expires
- // before giving up and falling back to the first-fit algorithm.
- Set<IInstanceKey> toBeExcluded = excludeBuilder.build();
- return Iterables.filter(tasks, t -> !toBeExcluded.contains(
- InstanceKeys.from(t.getTask().getJob(), t.getInstanceId())));
- }
-
- @Timed("assigner_maybe_assign")
- @Override
- public Set<String> maybeAssign(
- MutableStoreProvider storeProvider,
- ResourceRequest resourceRequest,
- TaskGroupKey groupKey,
- Iterable<IAssignedTask> tasks,
- Map<String, TaskGroupKey> preemptionReservations) {
-
- if (Iterables.isEmpty(tasks)) {
- return ImmutableSet.of();
- }
-
- boolean revocable = tierManager.getTier(groupKey.getTask()).isRevocable();
- ImmutableSet.Builder<String> assignmentResult = ImmutableSet.builder();
-
- Iterable<IAssignedTask> nonReservedTasks = maybeAssignReserved(
- tasks,
- storeProvider,
- revocable,
- resourceRequest,
- groupKey,
- assignmentResult);
-
- Iterator<IAssignedTask> remainingTasks = nonReservedTasks.iterator();
- // Make sure we still have tasks to process after reservations are processed.
- if (remainingTasks.hasNext()) {
- IAssignedTask task = remainingTasks.next();
- for (HostOffer offer : offerManager.getOffers(groupKey)) {
-
- if (!offer.hasCpuAndMem()) {
- // This offer lacks any type of CPU or mem resource, and therefore will never match
- // a task.
- continue;
- }
-
- String agentId = offer.getOffer().getAgentId().getValue();
-
- Optional<TaskGroupKey> reservedGroup = Optional.fromNullable(
- preemptionReservations.get(agentId));
-
- if (reservedGroup.isPresent() && !reservedGroup.get().equals(groupKey)) {
- // This slave is reserved for a different task group -> skip.
- continue;
- }
-
- if (!updateAgentReserver.getReservations(agentId).isEmpty()) {
- // This agent has been reserved for an update in-progress, skip.
- continue;
- }
-
- evaluatedOffers.incrementAndGet();
- try {
- boolean offerUsed = evaluateOffer(
- storeProvider, revocable, resourceRequest, groupKey, task, offer, assignmentResult);
- if (offerUsed) {
- if (remainingTasks.hasNext()) {
- task = remainingTasks.next();
- } else {
- break;
- }
- }
- } catch (OfferManager.LaunchException e) {
- break;
- }
- }
- }
-
- return assignmentResult.build();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
index 6033c01..e629093 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
@@ -157,7 +157,7 @@ public class AsyncStatsModule extends AbstractModule {
@Override
public Iterable<MachineResource> get() {
- Iterable<HostOffer> offers = offerManager.getOffers();
+ Iterable<HostOffer> offers = offerManager.getAll();
ImmutableList.Builder<MachineResource> builder = ImmutableList.builder();
for (HostOffer offer : offers) {
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
index 0ec4de6..5cb5310 100644
--- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
@@ -160,6 +160,7 @@ public class CommandLineTest {
expected.scheduling.reservationDuration = TEST_TIME;
expected.scheduling.schedulingMaxBatchSize = 42;
expected.scheduling.maxTasksPerScheduleAttempt = 42;
+ expected.taskAssigner.offerSelectorModules = ImmutableList.of(NoopModule.class);
expected.async.asyncWorkerThreads = 42;
expected.zk.inProcess = true;
expected.zk.zkEndpoints = ImmutableList.of(InetSocketAddress.createUnresolved("testing", 42));
@@ -266,11 +267,11 @@ public class CommandLineTest {
"-hold_offers_forever=true",
"-min_offer_hold_time=42days",
"-offer_hold_jitter_window=42days",
- "-offer_static_ban_cache_max_size=42",
"-offer_filter_duration=42days",
"-unavailability_threshold=42days",
"-offer_order=CPU,DISK",
"-offer_order_modules=org.apache.aurora.scheduler.config.CommandLineTest$NoopModule",
+ "-offer_static_ban_cache_max_size=42",
"-custom_executor_config=" + tempFile.getAbsolutePath(),
"-thermos_executor_path=testing",
"-thermos_executor_resources=testing",
@@ -306,6 +307,7 @@ public class CommandLineTest {
"-offer_reservation_duration=42days",
"-scheduling_max_batch_size=42",
"-max_tasks_per_schedule_attempt=42",
+ "-offer_selector_modules=org.apache.aurora.scheduler.config.CommandLineTest$NoopModule",
"-async_worker_threads=42",
"-zk_in_proc=true",
"-zk_endpoints=testing:42",
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/http/OffersTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/OffersTest.java b/src/test/java/org/apache/aurora/scheduler/http/OffersTest.java
index 3069959..549d2e3 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/OffersTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/OffersTest.java
@@ -49,7 +49,7 @@ public class OffersTest extends EasyMockTest {
@Test
public void testNoOffers() throws Exception {
- expect(offerManager.getOffers()).andReturn(ImmutableSet.of());
+ expect(offerManager.getAll()).andReturn(ImmutableSet.of());
control.replay();
@@ -134,7 +134,7 @@ public class OffersTest extends EasyMockTest {
.build(),
IHostAttributes.build(new HostAttributes().setMode(NONE)));
- expect(offerManager.getOffers()).andReturn(ImmutableSet.of(offer));
+ expect(offerManager.getAll()).andReturn(ImmutableSet.of(offer));
control.replay();
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
index 45ae6bb..64efc0d 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
@@ -259,7 +259,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest {
public void testOffers() {
storageUtil.expectOperations();
expectOfferAttributesSaved(HOST_OFFER);
- offerManager.addOffer(HOST_OFFER);
+ offerManager.add(HOST_OFFER);
control.replay();
@@ -272,8 +272,8 @@ public class MesosCallbackHandlerTest extends EasyMockTest {
storageUtil.expectOperations();
expectOfferAttributesSaved(HOST_OFFER);
expectOfferAttributesSaved(HOST_OFFER_2);
- offerManager.addOffer(HOST_OFFER);
- offerManager.addOffer(HOST_OFFER_2);
+ offerManager.add(HOST_OFFER);
+ offerManager.add(HOST_OFFER_2);
control.replay();
@@ -296,7 +296,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest {
expect(storageUtil.attributeStore.saveHostAttributes(saved)).andReturn(true);
// If the host is in draining, then the offer manager should get an offer with that attribute
- offerManager.addOffer(DRAINING_HOST_OFFER);
+ offerManager.add(DRAINING_HOST_OFFER);
control.replay();
handler.handleOffers(ImmutableList.of(HOST_OFFER.getOffer()));
@@ -316,7 +316,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest {
@Test
public void testRescind() {
- expect(offerManager.cancelOffer(OFFER_ID)).andReturn(true);
+ expect(offerManager.cancel(OFFER_ID)).andReturn(true);
control.replay();
@@ -336,12 +336,12 @@ public class MesosCallbackHandlerTest extends EasyMockTest {
FakeScheduledThreadPoolExecutor fakeExecutor = new FakeScheduledThreadPoolExecutor();
createHandler(false, fakeExecutor);
- expect(offerManager.cancelOffer(OFFER_ID)).andReturn(false);
- offerManager.banOffer(OFFER_ID);
+ expect(offerManager.cancel(OFFER_ID)).andReturn(false);
+ offerManager.ban(OFFER_ID);
storageUtil.expectOperations();
expectOfferAttributesSaved(HOST_OFFER);
- offerManager.addOffer(HOST_OFFER);
- expect(offerManager.cancelOffer(OFFER_ID)).andReturn(true);
+ offerManager.add(HOST_OFFER);
+ expect(offerManager.cancel(OFFER_ID)).andReturn(true);
control.replay();
replay(offerManager);
@@ -355,7 +355,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest {
// Eventually, we unban the offer.
handler.handleRescind(OFFER_ID);
- // 2 commands executed (addOffer and unbanOffer).
+ // 2 commands executed (add and unbanOffer).
fakeExecutor.advance();
fakeExecutor.advance();
[3/3] aurora git commit: Enable custom offer scoring modules for task
assignment
Posted by wf...@apache.org.
Enable custom offer scoring modules for task assignment
Major portions of the refactor:
* Refactor `OfferManager` to do filtering of offers (added `getMatching` and
`getAllMatching` methods) as opposed to TaskAssigner
* Refactor `TaskAssigner`, allow for injection of custom "scoring" class
through `OfferRanker` interface
And some minor things as well:
* Moved `TaskAssignerImpl`, `TaskSchedulerImpl`, and `HostOffers` into their own
upper-level classes
* Moved `TaskAssigner` to the `scheduling` package and out of the `state` package
* Renamed some methods in `OfferManager` to avoid code stutter
* Renaming of some classes (e.g. `FirstFitTaskAssigner` -> `TaskAssignerImpl`)
* And a slew of others
Reviewed at https://reviews.apache.org/r/63973/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/80139da4
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/80139da4
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/80139da4
Branch: refs/heads/master
Commit: 80139da4624916e406c7e80c4ea2d286d4d859c3
Parents: 21af250
Author: Jordan Ly <jo...@gmail.com>
Authored: Tue Nov 28 11:02:14 2017 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Tue Nov 28 11:02:14 2017 -0800
----------------------------------------------------------------------
RELEASE-NOTES.md | 3 +
docs/reference/scheduler-configuration.md | 2 +-
.../org/apache/aurora/benchmark/Offers.java | 2 +-
.../aurora/benchmark/SchedulingBenchmarks.java | 16 +-
.../benchmark/fakes/FakeOfferManager.java | 31 +-
.../apache/aurora/scheduler/app/AppModule.java | 4 +-
.../aurora/scheduler/config/CliOptions.java | 6 +-
.../scheduler/filter/AttributeAggregate.java | 38 +-
.../scheduler/filter/SchedulingFilter.java | 5 +
.../scheduler/filter/SchedulingFilterImpl.java | 2 +-
.../apache/aurora/scheduler/http/Offers.java | 2 +-
.../scheduler/mesos/MesosCallbackHandler.java | 12 +-
.../aurora/scheduler/offers/HostOffers.java | 253 +++++++++
.../aurora/scheduler/offers/OfferManager.java | 404 ++------------
.../scheduler/offers/OfferManagerImpl.java | 246 +++++++++
.../scheduler/offers/OfferManagerModule.java | 211 ++++++++
.../aurora/scheduler/offers/OfferSettings.java | 7 +-
.../aurora/scheduler/offers/OffersModule.java | 211 --------
.../preemptor/PendingTaskProcessor.java | 2 +-
.../aurora/scheduler/preemptor/Preemptor.java | 2 +-
.../scheduling/FirstFitOfferSelector.java | 29 +
.../scheduling/FirstFitOfferSelectorModule.java | 26 +
.../scheduler/scheduling/OfferSelector.java | 36 ++
.../scheduler/scheduling/SchedulingModule.java | 4 +-
.../scheduler/scheduling/TaskAssigner.java | 46 ++
.../scheduler/scheduling/TaskAssignerImpl.java | 284 ++++++++++
.../scheduling/TaskAssignerImplModule.java | 59 ++
.../scheduler/scheduling/TaskScheduler.java | 191 -------
.../scheduler/scheduling/TaskSchedulerImpl.java | 207 +++++++
.../state/FirstFitTaskAssignerModule.java | 31 --
.../aurora/scheduler/state/StateModule.java | 3 +-
.../aurora/scheduler/state/TaskAssigner.java | 338 ------------
.../scheduler/stats/AsyncStatsModule.java | 2 +-
.../scheduler/config/CommandLineTest.java | 4 +-
.../aurora/scheduler/http/OffersTest.java | 4 +-
.../mesos/MesosCallbackHandlerTest.java | 20 +-
.../scheduler/offers/OfferManagerImplTest.java | 381 +++++++++----
.../preemptor/PendingTaskProcessorTest.java | 2 +-
.../scheduler/preemptor/PreemptorImplTest.java | 2 +-
.../preemptor/PreemptorModuleTest.java | 2 +-
.../scheduling/FirstFitOfferSelectorTest.java | 66 +++
.../scheduling/TaskAssignerImplTest.java | 374 +++++++++++++
.../scheduling/TaskSchedulerImplTest.java | 2 -
.../state/FirstFitTaskAssignerTest.java | 539 -------------------
.../scheduler/stats/AsyncStatsModuleTest.java | 2 +-
45 files changed, 2244 insertions(+), 1869 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 2d3c423..54dcc75 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -12,6 +12,9 @@
a production cluster. For that reason, the functionality is behind a new flag `-partition_aware`
that is disabled by default. When Mesos support is improved and the new behavior is vetted in
production clusters, we'll enable this by default.
+- Added the ability to "score" offers for a given scheduling assignment via the `OfferSelector`
+ interface. The default implementation is first fit, but cluster operators can inject a custom
+ scoring algorithm through the `-offer_selector_modules` flag.
### Deprecations and removals:
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/docs/reference/scheduler-configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/scheduler-configuration.md b/docs/reference/scheduler-configuration.md
index 6c385b5..f697b6f 100644
--- a/docs/reference/scheduler-configuration.md
+++ b/docs/reference/scheduler-configuration.md
@@ -222,7 +222,7 @@ Optional flags:
Time for a stat to be retained in memory before expiring.
-stat_sampling_interval (default (1, secs))
Statistic value sampling interval.
--task_assigner_modules (default [class org.apache.aurora.scheduler.state.FirstFitTaskAssignerModule])
+-task_assigner_modules (default [class org.apache.aurora.scheduler.scheduling.TaskAssignerImplModule])
Guice modules for replacing task assignment logic.
-thermos_executor_cpu (default 0.25)
The number of CPU cores to allocate for each instance of the executor.
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/jmh/java/org/apache/aurora/benchmark/Offers.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/Offers.java b/src/jmh/java/org/apache/aurora/benchmark/Offers.java
index 2b46326..2fcc804 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/Offers.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/Offers.java
@@ -50,7 +50,7 @@ final class Offers {
*/
static void addOffers(OfferManager offerManager, Iterable<HostOffer> offers) {
for (HostOffer offer : offers) {
- offerManager.addOffer(offer);
+ offerManager.add(offer);
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index 1708a50..58e3224 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -62,15 +62,17 @@ import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.mesos.TestExecutorSettings;
import org.apache.aurora.scheduler.offers.Deferment;
import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.offers.OfferManagerImpl;
+import org.apache.aurora.scheduler.offers.OfferManagerModule;
import org.apache.aurora.scheduler.offers.OfferOrder;
import org.apache.aurora.scheduler.offers.OfferSettings;
-import org.apache.aurora.scheduler.offers.OffersModule;
import org.apache.aurora.scheduler.preemptor.BiCache;
import org.apache.aurora.scheduler.preemptor.ClusterStateImpl;
import org.apache.aurora.scheduler.preemptor.PreemptorModule;
import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
import org.apache.aurora.scheduler.scheduling.TaskScheduler;
-import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl.ReservationDuration;
+import org.apache.aurora.scheduler.scheduling.TaskSchedulerImpl;
+import org.apache.aurora.scheduler.scheduling.TaskSchedulerImpl.ReservationDuration;
import org.apache.aurora.scheduler.state.StateModule;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
@@ -147,8 +149,8 @@ public class SchedulingBenchmarks {
bind(ScheduledExecutorService.class).annotatedWith(AsyncModule.AsyncExecutor.class)
.toInstance(new NoopExecutor());
bind(Deferment.class).to(Deferment.Noop.class);
- bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
- bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
+ bind(OfferManager.class).to(OfferManagerImpl.class);
+ bind(OfferManagerImpl.class).in(Singleton.class);
bind(OfferSettings.class).toInstance(
new OfferSettings(NO_DELAY,
ImmutableList.of(OfferOrder.RANDOM),
@@ -157,8 +159,8 @@ public class SchedulingBenchmarks {
new FakeTicker()));
bind(BiCache.BiCacheSettings.class).toInstance(
new BiCache.BiCacheSettings(DELAY_FOREVER, ""));
- bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class);
- bind(TaskScheduler.TaskSchedulerImpl.class).in(Singleton.class);
+ bind(TaskScheduler.class).to(TaskSchedulerImpl.class);
+ bind(TaskSchedulerImpl.class).in(Singleton.class);
expose(TaskScheduler.class);
expose(OfferManager.class);
}
@@ -171,7 +173,7 @@ public class SchedulingBenchmarks {
.toInstance(DELAY_FOREVER);
bind(TaskIdGenerator.class).to(TaskIdGenerator.TaskIdGeneratorImpl.class);
bind(new TypeLiteral<Amount<Long, Time>>() { })
- .annotatedWith(OffersModule.UnavailabilityThreshold.class)
+ .annotatedWith(OfferManagerModule.UnavailabilityThreshold.class)
.toInstance(Amount.of(1L, Time.MINUTES));
bind(UpdateAgentReserver.class).to(UpdateAgentReserver.NullAgentReserver.class);
bind(UpdateAgentReserver.NullAgentReserver.class).in(Singleton.class);
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
index 201aa81..05c58ab 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeOfferManager.java
@@ -18,22 +18,23 @@ import com.google.common.base.Optional;
import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.aurora.scheduler.offers.OfferManager;
import org.apache.mesos.v1.Protos;
public class FakeOfferManager implements OfferManager {
@Override
- public void addOffer(HostOffer offer) {
+ public void add(HostOffer offer) {
// no-op
}
@Override
- public boolean cancelOffer(Protos.OfferID offerId) {
+ public boolean cancel(Protos.OfferID offerId) {
return false;
}
@Override
- public void banOffer(Protos.OfferID offerId) {
+ public void ban(Protos.OfferID offerId) {
// no-op
}
@@ -43,27 +44,33 @@ public class FakeOfferManager implements OfferManager {
}
@Override
- public void banOfferForTaskGroup(Protos.OfferID offerId, TaskGroupKey groupKey) {
+ public void hostAttributesChanged(PubsubEvent.HostAttributesChanged change) {
// no-op
}
@Override
- public Iterable<HostOffer> getOffers(TaskGroupKey groupKey) {
- return null;
+ public Optional<HostOffer> get(Protos.AgentID agentId) {
+ return Optional.absent();
}
@Override
- public void hostAttributesChanged(PubsubEvent.HostAttributesChanged change) {
- // no-op
+ public Iterable<HostOffer> getAll() {
+ return null;
}
@Override
- public Iterable<HostOffer> getOffers() {
- return null;
+ public Optional<HostOffer> getMatching(Protos.AgentID slaveId,
+ ResourceRequest resourceRequest,
+ boolean revocable) {
+
+ return Optional.absent();
}
@Override
- public Optional<HostOffer> getOffer(Protos.AgentID agentId) {
- return Optional.absent();
+ public Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
+ ResourceRequest resourceRequest,
+ boolean revocable) {
+
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
index 3204cca..817a019 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java
@@ -47,7 +47,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
import org.apache.aurora.scheduler.http.JettyServerModule;
import org.apache.aurora.scheduler.mesos.SchedulerDriverModule;
import org.apache.aurora.scheduler.metadata.MetadataModule;
-import org.apache.aurora.scheduler.offers.OffersModule;
+import org.apache.aurora.scheduler.offers.OfferManagerModule;
import org.apache.aurora.scheduler.preemptor.PreemptorModule;
import org.apache.aurora.scheduler.pruning.PruningModule;
import org.apache.aurora.scheduler.quota.QuotaModule;
@@ -172,7 +172,7 @@ public class AppModule extends AbstractModule {
install(new PubsubEventModule());
install(new AsyncModule(options.async));
- install(new OffersModule(options));
+ install(new OfferManagerModule(options));
install(new PruningModule(options.pruning));
install(new ReconciliationModule(options.reconciliation));
install(new SchedulingModule(options.scheduling));
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java b/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
index d4537e3..b7f43e0 100644
--- a/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
+++ b/src/main/java/org/apache/aurora/scheduler/config/CliOptions.java
@@ -36,12 +36,13 @@ import org.apache.aurora.scheduler.http.api.security.IniShiroRealmModule;
import org.apache.aurora.scheduler.http.api.security.Kerberos5ShiroRealmModule;
import org.apache.aurora.scheduler.log.mesos.MesosLogStreamModule;
import org.apache.aurora.scheduler.mesos.CommandLineDriverSettingsModule;
-import org.apache.aurora.scheduler.offers.OffersModule;
+import org.apache.aurora.scheduler.offers.OfferManagerModule;
import org.apache.aurora.scheduler.preemptor.PreemptorModule;
import org.apache.aurora.scheduler.pruning.PruningModule;
import org.apache.aurora.scheduler.reconciliation.ReconciliationModule;
import org.apache.aurora.scheduler.resources.ResourceSettings;
import org.apache.aurora.scheduler.scheduling.SchedulingModule;
+import org.apache.aurora.scheduler.scheduling.TaskAssignerImplModule;
import org.apache.aurora.scheduler.sla.SlaModule;
import org.apache.aurora.scheduler.state.StateModule;
import org.apache.aurora.scheduler.stats.AsyncStatsModule;
@@ -54,7 +55,7 @@ import org.apache.aurora.scheduler.updater.UpdaterModule;
public class CliOptions {
public final ReconciliationModule.Options reconciliation =
new ReconciliationModule.Options();
- public final OffersModule.Options offer = new OffersModule.Options();
+ public final OfferManagerModule.Options offer = new OfferManagerModule.Options();
public final ExecutorModule.Options executor = new ExecutorModule.Options();
public final AppModule.Options app = new AppModule.Options();
public final SchedulerMain.Options main = new SchedulerMain.Options();
@@ -84,6 +85,7 @@ public class CliOptions {
public final StatsModule.Options stats = new StatsModule.Options();
public final CronModule.Options cron = new CronModule.Options();
public final ResourceSettings resourceSettings = new ResourceSettings();
+ public final TaskAssignerImplModule.Options taskAssigner = new TaskAssignerImplModule.Options();
final List<Object> custom;
public CliOptions() {
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
index 60f141d..a5acafa 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/AttributeAggregate.java
@@ -49,19 +49,8 @@ public final class AttributeAggregate {
*/
private Supplier<Multiset<Pair<String, String>>> aggregate;
- private boolean isInitialized = false;
-
private AttributeAggregate(Supplier<Multiset<Pair<String, String>>> aggregate) {
- this.aggregate = Suppliers.memoize(
- () -> {
- initialize();
- return aggregate.get();
- }
- );
- }
-
- private void initialize() {
- isInitialized = true; // inlining this assignment yields a PMD false positive
+ this.aggregate = Suppliers.memoize(aggregate);
}
/**
@@ -123,21 +112,16 @@ public final class AttributeAggregate {
}
public void updateAttributeAggregate(IHostAttributes attributes) {
- // If the aggregate supplier has not been populated there is no need to update it here.
- // All tasks attributes will be picked up by the wrapped task query if executed at a
- // later point in time.
- if (isInitialized) {
- final Supplier<Multiset<Pair<String, String>>> previous = aggregate;
- aggregate = Suppliers.memoize(
- () -> {
- ImmutableMultiset.Builder<Pair<String, String>> builder
- = new ImmutableMultiset.Builder<>();
- builder.addAll(previous.get());
- addAttributes(builder, attributes.getAttributes());
- return builder.build();
- }
- );
- }
+ final Supplier<Multiset<Pair<String, String>>> previous = aggregate;
+ aggregate = Suppliers.memoize(
+ () -> {
+ ImmutableMultiset.Builder<Pair<String, String>> builder
+ = new ImmutableMultiset.Builder<>();
+ builder.addAll(previous.get());
+ addAttributes(builder, attributes.getAttributes());
+ return builder.build();
+ }
+ );
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index 36608a9..a00c095 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
+import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.storage.entities.IConstraint;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -258,6 +259,10 @@ public interface SchedulingFilter {
this(offer, attributes, Optional.absent());
}
+ public UnusedResource(HostOffer offer, boolean revocable) {
+ this(offer.getResourceBag(revocable), offer.getAttributes(), offer.getUnavailabilityStart());
+ }
+
public UnusedResource(ResourceBag offer, IHostAttributes attributes, Optional<Instant> start) {
this.offer = offer;
this.attributes = attributes;
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
index df51d4c..41a0764 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
@@ -32,7 +32,7 @@ import org.apache.aurora.common.util.Clock;
import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.TaskConstraint;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
-import org.apache.aurora.scheduler.offers.OffersModule.UnavailabilityThreshold;
+import org.apache.aurora.scheduler.offers.OfferManagerModule.UnavailabilityThreshold;
import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.resources.ResourceType;
import org.apache.aurora.scheduler.storage.entities.IAttribute;
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/http/Offers.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Offers.java b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
index f22ca6e..bb92cd0 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Offers.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
@@ -59,7 +59,7 @@ public class Offers {
public Response getOffers() throws JsonProcessingException {
return Response.ok(
mapper.writeValueAsString(
- StreamSupport.stream(offerManager.getOffers().spliterator(), false)
+ StreamSupport.stream(offerManager.getAll().spliterator(), false)
.map(o -> o.getOffer())
.collect(Collectors.toList())))
.build();
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
index fd5874d..87e702f 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
@@ -40,7 +40,7 @@ import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.events.PubsubEventModule;
import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.offers.OffersModule;
+import org.apache.aurora.scheduler.offers.OfferManagerModule;
import org.apache.aurora.scheduler.state.MaintenanceController;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.Storage;
@@ -126,7 +126,7 @@ public interface MesosCallbackHandler {
Driver driver,
Clock clock,
MaintenanceController controller,
- @OffersModule.UnavailabilityThreshold Amount<Long, Time> unavailabilityThreshold,
+ @OfferManagerModule.UnavailabilityThreshold Amount<Long, Time> unavailabilityThreshold,
@PubsubEventModule.RegisteredEvents EventSink registeredEventSink) {
this(
@@ -226,7 +226,7 @@ public interface MesosCallbackHandler {
storeProvider.getAttributeStore().saveHostAttributes(attributes);
log.info("Received offer: {}", offer.getId().getValue());
offersReceived.incrementAndGet();
- offerManager.addOffer(new HostOffer(offer, attributes));
+ offerManager.add(new HostOffer(offer, attributes));
}
});
});
@@ -244,15 +244,15 @@ public interface MesosCallbackHandler {
// In this scenario, we want to ensure that we do not use it/accept it when the executor
// finally processes the offer. We will temporarily ban it and add a command for the
// executor to unban it so future offers can be processed normally.
- boolean offerCancelled = offerManager.cancelOffer(offerId);
+ boolean offerCancelled = offerManager.cancel(offerId);
if (!offerCancelled) {
log.info(
"Received rescind before adding offer: {}, temporarily banning.",
offerId.getValue());
- offerManager.banOffer(offerId);
+ offerManager.ban(offerId);
executor.execute(() -> {
log.info("Cancelling and unbanning offer: {}.", offerId.getValue());
- offerManager.cancelOffer(offerId);
+ offerManager.cancel(offerId);
});
}
offersRescinded.incrementAndGet();
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java b/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
new file mode 100644
index 0000000..8adbcb1
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/offers/HostOffers.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.offers;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.aurora.common.collections.Pair;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.mesos.v1.Protos;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A container for the data structures used by this {@link OfferManagerImpl}, to make it easier to
+ * reason about the different indices used and their consistency.
+ */
+class HostOffers {
+ private final Set<HostOffer> offers;
+
+ private final Map<Protos.OfferID, HostOffer> offersById = Maps.newHashMap();
+ private final Map<Protos.AgentID, HostOffer> offersBySlave = Maps.newHashMap();
+ private final Map<String, HostOffer> offersByHost = Maps.newHashMap();
+
+ // Keep track of offer->groupKey mappings that will never be matched to avoid redundant
+ // scheduling attempts. See VetoGroup for more details on static ban.
+ private final Cache<Pair<Protos.OfferID, TaskGroupKey>, Boolean> staticallyBannedOffers;
+ private final SchedulingFilter schedulingFilter;
+
+ // Keep track of globally banned offers that will never be matched to anything.
+ private final Set<Protos.OfferID> globallyBannedOffers = Sets.newHashSet();
+
+ // Keep track of the number of offers evaluated for vetoes when getting matching offers
+ private final AtomicLong vetoEvaluatedOffers;
+
+ HostOffers(StatsProvider statsProvider,
+ OfferSettings offerSettings,
+ SchedulingFilter schedulingFilter) {
+ this.offers = new ConcurrentSkipListSet<>(offerSettings.getOrdering());
+ this.staticallyBannedOffers = offerSettings
+ .getStaticBanCacheBuilder()
+ .build();
+ this.schedulingFilter = requireNonNull(schedulingFilter);
+
+ // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more expensive.
+ // Could track this separately if it turns out to pose problems.
+ statsProvider.exportSize(OfferManagerImpl.OUTSTANDING_OFFERS, offers);
+ statsProvider.makeGauge(OfferManagerImpl.STATICALLY_BANNED_OFFERS,
+ staticallyBannedOffers::size);
+ statsProvider.makeGauge(OfferManagerImpl.STATICALLY_BANNED_OFFERS_HIT_RATE,
+ () -> staticallyBannedOffers.stats().hitRate());
+ statsProvider.makeGauge(OfferManagerImpl.GLOBALLY_BANNED_OFFERS, globallyBannedOffers::size);
+
+ vetoEvaluatedOffers = statsProvider.makeCounter(OfferManagerImpl.VETO_EVALUATED_OFFERS);
+ }
+
+ /**
+ * Adds an offer while maintaining a guarantee that no two offers may exist with the same
+ * agent ID. If an offer exists with the same agent ID, the existing offer is removed
+ * and returned, and {@code offer} is not added.
+ *
+ * @param offer Offer to add.
+ * @return The pre-existing offer with the same agent ID as {@code offer}, if one exists,
+ * which will also be removed prior to returning.
+ */
+ synchronized Optional<HostOffer> addAndPreventAgentCollision(HostOffer offer) {
+ HostOffer sameAgent = offersBySlave.get(offer.getOffer().getAgentId());
+ if (sameAgent != null) {
+ remove(sameAgent.getOffer().getId());
+ return Optional.of(sameAgent);
+ }
+
+ addInternal(offer);
+ return Optional.absent();
+ }
+
+ private void addInternal(HostOffer offer) {
+ offers.add(offer);
+ offersById.put(offer.getOffer().getId(), offer);
+ offersBySlave.put(offer.getOffer().getAgentId(), offer);
+ offersByHost.put(offer.getOffer().getHostname(), offer);
+ }
+
+ synchronized boolean remove(Protos.OfferID id) {
+ HostOffer removed = offersById.remove(id);
+ if (removed != null) {
+ offers.remove(removed);
+ offersBySlave.remove(removed.getOffer().getAgentId());
+ offersByHost.remove(removed.getOffer().getHostname());
+ }
+ globallyBannedOffers.remove(id);
+ return removed != null;
+ }
+
+ synchronized void addGlobalBan(Protos.OfferID offerId) {
+ globallyBannedOffers.add(offerId);
+ }
+
+ synchronized void updateHostAttributes(IHostAttributes attributes) {
+ HostOffer offer = offersByHost.remove(attributes.getHost());
+ if (offer != null) {
+ // Remove and re-add a host's offer to re-sort based on its new hostStatus
+ remove(offer.getOffer().getId());
+ addInternal(new HostOffer(offer.getOffer(), attributes));
+ }
+ }
+
+ synchronized Optional<HostOffer> get(Protos.AgentID slaveId) {
+ HostOffer offer = offersBySlave.get(slaveId);
+ if (offer == null || globallyBannedOffers.contains(offer.getOffer().getId())) {
+ return Optional.absent();
+ }
+
+ return Optional.of(offer);
+ }
+
+ /**
+ * Returns an iterable giving the state of the offers at the time the method is called. Unlike
+ * {@code getWeaklyConsistentOffers}, the underlying collection is a copy of the original and
+ * will not be modified outside of the returned iterable.
+ *
+ * @return The offers currently known by the scheduler.
+ */
+ synchronized Iterable<HostOffer> getOffers() {
+ return FluentIterable.from(offers)
+ .filter(o -> !globallyBannedOffers.contains(o.getOffer().getId()))
+ .toSet();
+ }
+
+ synchronized Optional<HostOffer> getMatching(Protos.AgentID slaveId,
+ ResourceRequest resourceRequest,
+ boolean revocable) {
+
+ Optional<HostOffer> optionalOffer = get(slaveId);
+ if (optionalOffer.isPresent()) {
+ HostOffer offer = optionalOffer.get();
+
+ if (isGloballyBanned(offer)
+ || isVetoed(offer, resourceRequest, revocable, Optional.absent())) {
+
+ return Optional.absent();
+ }
+ }
+
+ return optionalOffer;
+ }
+
+ /**
+ * Returns a weakly-consistent iterable giving the available offers to a given
+ * {@code groupKey}. This iterable can handle concurrent operations on its underlying
+ * collection, and may reflect changes that happen after the construction of the iterable.
+ * This property is mainly used in {@code launchTask}.
+ *
+ * @param groupKey The task group to get offers for.
+ * @return The offers a given task group can use.
+ */
+ synchronized Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
+ ResourceRequest resourceRequest,
+ boolean revocable) {
+
+ return Iterables.unmodifiableIterable(FluentIterable.from(offers)
+ .filter(o -> !isGloballyBanned(o))
+ .filter(o -> !isStaticallyBanned(o, groupKey))
+ .filter(HostOffer::hasCpuAndMem)
+ .filter(o -> !isVetoed(o, resourceRequest, revocable, Optional.of(groupKey))));
+ }
+
+ private synchronized boolean isGloballyBanned(HostOffer offer) {
+ return globallyBannedOffers.contains(offer.getOffer().getId());
+ }
+
+ private synchronized boolean isStaticallyBanned(HostOffer offer, TaskGroupKey groupKey) {
+ return staticallyBannedOffers.getIfPresent(Pair.of(offer.getOffer().getId(), groupKey)) != null;
+ }
+
+ /**
+ * Determine whether or not the {@link HostOffer} is vetoed for the given {@link ResourceRequest}.
+ * If {@code groupKey} is present, this method will also temporarily ban the offer from ever
+ * matching the {@link TaskGroupKey}.
+ */
+ private boolean isVetoed(HostOffer offer,
+ ResourceRequest resourceRequest,
+ boolean revocable,
+ Optional<TaskGroupKey> groupKey) {
+
+ vetoEvaluatedOffers.incrementAndGet();
+ UnusedResource unusedResource = new UnusedResource(offer, revocable);
+ Set<Veto> vetoes = schedulingFilter.filter(unusedResource, resourceRequest);
+ if (!vetoes.isEmpty()) {
+ if (groupKey.isPresent() && Veto.identifyGroup(vetoes) == SchedulingFilter.VetoGroup.STATIC) {
+ addStaticGroupBan(offer.getOffer().getId(), groupKey.get());
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ @VisibleForTesting
+ synchronized void addStaticGroupBan(Protos.OfferID offerId, TaskGroupKey groupKey) {
+ if (offersById.containsKey(offerId)) {
+ staticallyBannedOffers.put(Pair.of(offerId, groupKey), true);
+ }
+ }
+
+ @VisibleForTesting
+ synchronized Set<Pair<Protos.OfferID, TaskGroupKey>> getStaticBans() {
+ return staticallyBannedOffers.asMap().keySet();
+ }
+
+ synchronized void clear() {
+ offers.clear();
+ offersById.clear();
+ offersBySlave.clear();
+ offersByHost.clear();
+ staticallyBannedOffers.invalidateAll();
+ globallyBannedOffers.clear();
+ }
+
+ @VisibleForTesting
+ synchronized void cleanUpStaticallyBannedOffers() {
+ staticallyBannedOffers.cleanUp();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
index 96b0f46..0349215 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
@@ -13,41 +13,16 @@
*/
package org.apache.aurora.scheduler.offers;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.inject.Inject;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
-import com.google.common.cache.Cache;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.eventbus.Subscribe;
-import org.apache.aurora.common.collections.Pair;
-import org.apache.aurora.common.inject.TimedInterceptor.Timed;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
import org.apache.mesos.v1.Protos;
import org.apache.mesos.v1.Protos.AgentID;
-import org.apache.mesos.v1.Protos.Offer.Operation;
import org.apache.mesos.v1.Protos.OfferID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.util.Objects.requireNonNull;
import static org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged;
@@ -61,7 +36,7 @@ public interface OfferManager extends EventSubscriber {
*
* @param offer Newly-available resource offer.
*/
- void addOffer(HostOffer offer);
+ void add(HostOffer offer);
/**
* Invalidates an offer. This indicates that the scheduler should not attempt to match any
@@ -70,62 +45,70 @@ public interface OfferManager extends EventSubscriber {
* @param offerId Cancelled offer.
* @return A boolean on whether or not the offer was successfully cancelled.
*/
- boolean cancelOffer(OfferID offerId);
+ boolean cancel(OfferID offerId);
/**
* Exclude an offer from being matched against all tasks.
*
* @param offerId Offer ID to ban.
*/
- void banOffer(OfferID offerId);
+ void ban(OfferID offerId);
/**
- * Exclude an offer that results in a static mismatch from further attempts to match against all
- * tasks from the same group.
+ * Notifies the offer queue that a host's attributes have changed.
*
- * @param offerId Offer ID to exclude for the given {@code groupKey}.
- * @param groupKey Task group key to exclude.
+ * @param change State change notification.
*/
- void banOfferForTaskGroup(OfferID offerId, TaskGroupKey groupKey);
+ void hostAttributesChanged(HostAttributesChanged change);
/**
- * Launches the task matched against the offer.
+ * Gets the offer for the given slave ID.
*
- * @param offerId Matched offer ID.
- * @param task Matched task info.
- * @throws LaunchException If there was an error launching the task.
+ * @param slaveId Slave ID to get the offer for.
+ * @return The offer for the slave ID.
*/
- void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException;
+ Optional<HostOffer> get(AgentID slaveId);
/**
- * Notifies the offer queue that a host's attributes have changed.
+ * Gets all offers that the scheduler is holding, excluding banned offers.
*
- * @param change State change notification.
+ * @return A snapshot of the offers that the scheduler is currently holding.
*/
- void hostAttributesChanged(HostAttributesChanged change);
+ Iterable<HostOffer> getAll();
/**
- * Gets the offers that the scheduler is holding, excluding banned offers.
+ * Gets the offer for the given slave ID if satisfies the supplied {@link ResourceRequest}.
*
- * @return A snapshot of the offers that the scheduler is currently holding.
+ * @param slaveId Slave ID to get the offer for.
+ * @param resourceRequest The request that the offer should satisfy.
+ * @param revocable Whether or not the request can use revocable resources.
+ * @return An option containing the offer for the slave ID if it fits.
*/
- Iterable<HostOffer> getOffers();
+ Optional<HostOffer> getMatching(AgentID slaveId,
+ ResourceRequest resourceRequest,
+ boolean revocable);
/**
- * Gets all offers that are not banned for the given {@code groupKey}.
+ * Gets all offers that the scheduler is holding that satisfy the supplied
+ * {@link ResourceRequest}.
*
- * @param groupKey Task group key to check offers for.
- * @return A snapshot of all offers eligible for the given {@code groupKey}.
+ * @param groupKey The {@link TaskGroupKey} of the task in the {@link ResourceRequest}.
+ * @param resourceRequest The request that the offer should satisfy.
+ * @param revocable Whether or not the request can use revocable resources.
+ * @return An option containing the offer for the slave ID if it fits.
*/
- Iterable<HostOffer> getOffers(TaskGroupKey groupKey);
+ Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
+ ResourceRequest resourceRequest,
+ boolean revocable);
/**
- * Gets an offer for the given slave ID.
+ * Launches the task matched against the offer.
*
- * @param slaveId Slave ID to get offer for.
- * @return An offer for the slave ID.
+ * @param offerId Matched offer ID.
+ * @param task Matched task info.
+ * @throws LaunchException If there was an error launching the task.
*/
- Optional<HostOffer> getOffer(AgentID slaveId);
+ void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException;
/**
* Thrown when there was an unexpected failure trying to launch a task.
@@ -140,319 +123,4 @@ public interface OfferManager extends EventSubscriber {
super(msg, cause);
}
}
-
- class OfferManagerImpl implements OfferManager {
- @VisibleForTesting
- static final Logger LOG = LoggerFactory.getLogger(OfferManagerImpl.class);
- @VisibleForTesting
- static final String OFFER_ACCEPT_RACES = "offer_accept_races";
- @VisibleForTesting
- static final String OUTSTANDING_OFFERS = "outstanding_offers";
- @VisibleForTesting
- static final String STATICALLY_BANNED_OFFERS = "statically_banned_offers_size";
- @VisibleForTesting
- static final String STATICALLY_BANNED_OFFERS_HIT_RATE = "statically_banned_offers_hit_rate";
- @VisibleForTesting
- static final String OFFER_CANCEL_FAILURES = "offer_cancel_failures";
- @VisibleForTesting
- static final String GLOBALLY_BANNED_OFFERS = "globally_banned_offers_size";
-
- private final HostOffers hostOffers;
- private final AtomicLong offerRaces;
- private final AtomicLong offerCancelFailures;
-
- private final Driver driver;
- private final OfferSettings offerSettings;
- private final Deferment offerDecline;
-
- @Inject
- @VisibleForTesting
- public OfferManagerImpl(
- Driver driver,
- OfferSettings offerSettings,
- StatsProvider statsProvider,
- Deferment offerDecline) {
-
- this.driver = requireNonNull(driver);
- this.offerSettings = requireNonNull(offerSettings);
- this.hostOffers = new HostOffers(statsProvider, offerSettings);
- this.offerRaces = statsProvider.makeCounter(OFFER_ACCEPT_RACES);
- this.offerCancelFailures = statsProvider.makeCounter(OFFER_CANCEL_FAILURES);
- this.offerDecline = requireNonNull(offerDecline);
- }
-
- @Override
- public void addOffer(HostOffer offer) {
- Optional<HostOffer> sameAgent = hostOffers.addAndPreventAgentCollision(offer);
- if (sameAgent.isPresent()) {
- // We have an existing offer for the same agent. We choose to return both offers so that
- // they may be combined into a single offer.
- LOG.info("Returning offers for " + offer.getOffer().getAgentId().getValue()
- + " for compaction.");
- decline(offer.getOffer().getId());
- decline(sameAgent.get().getOffer().getId());
- } else {
- offerDecline.defer(() -> removeAndDecline(offer.getOffer().getId()));
- }
- }
-
- private void removeAndDecline(OfferID id) {
- if (removeFromHostOffers(id)) {
- decline(id);
- }
- }
-
- private void decline(OfferID id) {
- LOG.debug("Declining offer {}", id);
- driver.declineOffer(id, getOfferFilter());
- }
-
- private Protos.Filters getOfferFilter() {
- return Protos.Filters.newBuilder()
- .setRefuseSeconds(offerSettings.getFilterDuration().as(Time.SECONDS))
- .build();
- }
-
- @Override
- public boolean cancelOffer(final OfferID offerId) {
- boolean success = removeFromHostOffers(offerId);
- if (!success) {
- // This will happen rarely when we race to process this rescind against accepting the offer
- // to launch a task.
- // If it happens frequently, we are likely processing rescinds before the offer itself.
- LOG.warn("Failed to cancel offer: {}.", offerId.getValue());
- this.offerCancelFailures.incrementAndGet();
- }
- return success;
- }
-
- @Override
- public void banOffer(OfferID offerId) {
- hostOffers.addGlobalBan(offerId);
- }
-
- private boolean removeFromHostOffers(final OfferID offerId) {
- requireNonNull(offerId);
-
- // The small risk of inconsistency is acceptable here - if we have an accept/remove race
- // on an offer, the master will mark the task as LOST and it will be retried.
- return hostOffers.remove(offerId);
- }
-
- @Override
- public Iterable<HostOffer> getOffers() {
- return hostOffers.getOffers();
- }
-
- @Override
- public Iterable<HostOffer> getOffers(TaskGroupKey groupKey) {
- return hostOffers.getWeaklyConsistentOffers(groupKey);
- }
-
- @Override
- public Optional<HostOffer> getOffer(AgentID slaveId) {
- return hostOffers.get(slaveId);
- }
-
- /**
- * Updates the preference of a host's offers.
- *
- * @param change Host change notification.
- */
- @Subscribe
- public void hostAttributesChanged(HostAttributesChanged change) {
- hostOffers.updateHostAttributes(change.getAttributes());
- }
-
- /**
- * Notifies the queue that the driver is disconnected, and all the stored offers are now
- * invalid.
- * <p>
- * The queue takes this as a signal to flush its queue.
- *
- * @param event Disconnected event.
- */
- @Subscribe
- public void driverDisconnected(DriverDisconnected event) {
- LOG.info("Clearing stale offers since the driver is disconnected.");
- hostOffers.clear();
- }
-
- /**
- * Used for testing to ensure that the underlying cache's `size` method returns an accurate
- * value by not including evicted entries.
- */
- @VisibleForTesting
- public void cleanupStaticBans() {
- hostOffers.staticallyBannedOffers.cleanUp();
- }
-
- /**
- * A container for the data structures used by this class, to make it easier to reason about
- * the different indices used and their consistency.
- */
- private static class HostOffers {
-
- private final Set<HostOffer> offers;
- private final Map<OfferID, HostOffer> offersById = Maps.newHashMap();
- private final Map<AgentID, HostOffer> offersBySlave = Maps.newHashMap();
- private final Map<String, HostOffer> offersByHost = Maps.newHashMap();
-
- // Keep track of offer->groupKey mappings that will never be matched to avoid redundant
- // scheduling attempts. See VetoGroup for more details on static ban.
- private final Cache<Pair<OfferID, TaskGroupKey>, Boolean> staticallyBannedOffers;
-
- // Keep track of globally banned offers that will never be matched to anything.
- private final Set<OfferID> globallyBannedOffers = Sets.newConcurrentHashSet();
-
- HostOffers(StatsProvider statsProvider, OfferSettings offerSettings) {
- offers = new ConcurrentSkipListSet<>(offerSettings.getOrdering());
- staticallyBannedOffers = offerSettings
- .getStaticBanCacheBuilder()
- .build();
- // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more expensive.
- // Could track this separately if it turns out to pose problems.
- statsProvider.exportSize(OUTSTANDING_OFFERS, offers);
- statsProvider.makeGauge(STATICALLY_BANNED_OFFERS, staticallyBannedOffers::size);
- statsProvider.makeGauge(STATICALLY_BANNED_OFFERS_HIT_RATE,
- () -> staticallyBannedOffers.stats().hitRate());
- statsProvider.makeGauge(GLOBALLY_BANNED_OFFERS, globallyBannedOffers::size);
- }
-
- synchronized Optional<HostOffer> get(AgentID slaveId) {
- HostOffer offer = offersBySlave.get(slaveId);
- if (offer == null || globallyBannedOffers.contains(offer.getOffer().getId())) {
- return Optional.absent();
- }
-
- return Optional.of(offer);
- }
-
- /**
- * Adds an offer while maintaining a guarantee that no two offers may exist with the same
- * agent ID. If an offer exists with the same agent ID, the existing offer is removed
- * and returned, and {@code offer} is not added.
- *
- * @param offer Offer to add.
- * @return The pre-existing offer with the same agent ID as {@code offer}, if one exists,
- * which will also be removed prior to returning.
- */
- synchronized Optional<HostOffer> addAndPreventAgentCollision(HostOffer offer) {
- HostOffer sameAgent = offersBySlave.get(offer.getOffer().getAgentId());
- if (sameAgent != null) {
- remove(sameAgent.getOffer().getId());
- return Optional.of(sameAgent);
- }
-
- addInternal(offer);
- return Optional.absent();
- }
-
- private void addInternal(HostOffer offer) {
- offers.add(offer);
- offersById.put(offer.getOffer().getId(), offer);
- offersBySlave.put(offer.getOffer().getAgentId(), offer);
- offersByHost.put(offer.getOffer().getHostname(), offer);
- }
-
- synchronized boolean remove(OfferID id) {
- HostOffer removed = offersById.remove(id);
- if (removed != null) {
- offers.remove(removed);
- offersBySlave.remove(removed.getOffer().getAgentId());
- offersByHost.remove(removed.getOffer().getHostname());
- }
- globallyBannedOffers.remove(id);
- return removed != null;
- }
-
- synchronized void updateHostAttributes(IHostAttributes attributes) {
- HostOffer offer = offersByHost.remove(attributes.getHost());
- if (offer != null) {
- // Remove and re-add a host's offer to re-sort based on its new hostStatus
- remove(offer.getOffer().getId());
- addInternal(new HostOffer(offer.getOffer(), attributes));
- }
- }
-
- /**
- * Returns an iterable giving the state of the offers at the time the method is called. Unlike
- * {@code getWeaklyConsistentOffers}, the underlying collection is a copy of the original and
- * will not be modified outside of the returned iterable.
- *
- * @return The offers currently known by the scheduler.
- */
- synchronized Iterable<HostOffer> getOffers() {
- return FluentIterable.from(offers).filter(
- e -> !globallyBannedOffers.contains(e.getOffer().getId())
- ).toSet();
- }
-
- /**
- * Returns a weakly-consistent iterable giving the available offers to a given
- * {@code groupKey}. This iterable can handle concurrent operations on its underlying
- * collection, and may reflect changes that happen after the construction of the iterable.
- * This property is mainly used in {@code launchTask}.
- *
- * @param groupKey The task group to get offers for.
- * @return The offers a given task group can use.
- */
- synchronized Iterable<HostOffer> getWeaklyConsistentOffers(TaskGroupKey groupKey) {
- return Iterables.unmodifiableIterable(FluentIterable.from(offers).filter(e ->
- staticallyBannedOffers.getIfPresent(Pair.of(e.getOffer().getId(), groupKey)) == null
- && !globallyBannedOffers.contains(e.getOffer().getId())));
- }
-
- synchronized void addGlobalBan(OfferID offerId) {
- globallyBannedOffers.add(offerId);
- }
-
- synchronized void addStaticGroupBan(OfferID offerId, TaskGroupKey groupKey) {
- if (offersById.containsKey(offerId)) {
- staticallyBannedOffers.put(Pair.of(offerId, groupKey), true);
- }
- }
-
- synchronized void clear() {
- offers.clear();
- offersById.clear();
- offersBySlave.clear();
- offersByHost.clear();
- staticallyBannedOffers.invalidateAll();
- globallyBannedOffers.clear();
- }
- }
-
- @Override
- public void banOfferForTaskGroup(OfferID offerId, TaskGroupKey groupKey) {
- hostOffers.addStaticGroupBan(offerId, groupKey);
- }
-
- @Timed("offer_manager_launch_task")
- @Override
- public void launchTask(OfferID offerId, Protos.TaskInfo task) throws LaunchException {
- // Guard against an offer being removed after we grabbed it from the iterator.
- // If that happens, the offer will not exist in hostOffers, and we can immediately
- // send it back to LOST for quick reschedule.
- // Removing while iterating counts on the use of a weakly-consistent iterator being used,
- // which is a feature of ConcurrentSkipListSet.
- if (hostOffers.remove(offerId)) {
- try {
- Operation launch = Operation.newBuilder()
- .setType(Operation.Type.LAUNCH)
- .setLaunch(Operation.Launch.newBuilder().addTaskInfos(task))
- .build();
- driver.acceptOffers(offerId, ImmutableList.of(launch), getOfferFilter());
- } catch (IllegalStateException e) {
- // TODO(William Farner): Catch only the checked exception produced by Driver
- // once it changes from throwing IllegalStateException when the driver is not yet
- // registered.
- throw new LaunchException("Failed to launch task.", e);
- }
- } else {
- offerRaces.incrementAndGet();
- throw new LaunchException("Offer no longer exists in offer queue, likely data race.");
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
new file mode 100644
index 0000000..427b1b4
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerImpl.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.offers;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.eventbus.Subscribe;
+
+import org.apache.aurora.common.collections.Pair;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.mesos.v1.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.common.inject.TimedInterceptor.Timed;
+
+public class OfferManagerImpl implements OfferManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(org.apache.aurora.scheduler.offers.OfferManagerImpl.class);
+
+ @VisibleForTesting
+ static final String OFFER_ACCEPT_RACES = "offer_accept_races";
+ @VisibleForTesting
+ static final String OUTSTANDING_OFFERS = "outstanding_offers";
+ @VisibleForTesting
+ static final String STATICALLY_BANNED_OFFERS = "statically_banned_offers_size";
+ @VisibleForTesting
+ static final String STATICALLY_BANNED_OFFERS_HIT_RATE = "statically_banned_offers_hit_rate";
+ @VisibleForTesting
+ static final String OFFER_CANCEL_FAILURES = "offer_cancel_failures";
+ @VisibleForTesting
+ static final String GLOBALLY_BANNED_OFFERS = "globally_banned_offers_size";
+ @VisibleForTesting
+ static final String VETO_EVALUATED_OFFERS = "veto_evaluated_offers";
+
+ private final HostOffers hostOffers;
+ private final AtomicLong offerRaces;
+ private final AtomicLong offerCancelFailures;
+
+ private final Driver driver;
+ private final OfferSettings offerSettings;
+ private final Deferment offerDecline;
+
+ @Inject
+ @VisibleForTesting
+ public OfferManagerImpl(
+ Driver driver,
+ OfferSettings offerSettings,
+ StatsProvider statsProvider,
+ Deferment offerDecline,
+ SchedulingFilter schedulingFilter) {
+
+ this.driver = requireNonNull(driver);
+ this.offerSettings = requireNonNull(offerSettings);
+ this.hostOffers = new HostOffers(statsProvider, offerSettings, schedulingFilter);
+ this.offerRaces = statsProvider.makeCounter(OFFER_ACCEPT_RACES);
+ this.offerCancelFailures = statsProvider.makeCounter(OFFER_CANCEL_FAILURES);
+ this.offerDecline = requireNonNull(offerDecline);
+ }
+
+ @Override
+ public void add(HostOffer offer) {
+ Optional<HostOffer> sameAgent = hostOffers.addAndPreventAgentCollision(offer);
+ if (sameAgent.isPresent()) {
+ // We have an existing offer for the same agent. We choose to return both offers so that
+ // they may be combined into a single offer.
+ LOG.info("Returning offers for " + offer.getOffer().getAgentId().getValue()
+ + " for compaction.");
+ decline(offer.getOffer().getId());
+ decline(sameAgent.get().getOffer().getId());
+ } else {
+ offerDecline.defer(() -> removeAndDecline(offer.getOffer().getId()));
+ }
+ }
+
+ private void removeAndDecline(Protos.OfferID id) {
+ if (removeFromHostOffers(id)) {
+ decline(id);
+ }
+ }
+
+ private void decline(Protos.OfferID id) {
+ LOG.debug("Declining offer {}", id);
+ driver.declineOffer(id, getOfferFilter());
+ }
+
+ private Protos.Filters getOfferFilter() {
+ return Protos.Filters.newBuilder()
+ .setRefuseSeconds(offerSettings.getFilterDuration().as(Time.SECONDS))
+ .build();
+ }
+
+ @Override
+ public boolean cancel(final Protos.OfferID offerId) {
+ boolean success = removeFromHostOffers(offerId);
+ if (!success) {
+ // This will happen rarely when we race to process this rescind against accepting the offer
+ // to launch a task.
+ // If it happens frequently, we are likely processing rescinds before the offer itself.
+ LOG.warn("Failed to cancel offer: {}.", offerId.getValue());
+ this.offerCancelFailures.incrementAndGet();
+ }
+ return success;
+ }
+
+ private boolean removeFromHostOffers(final Protos.OfferID offerId) {
+ requireNonNull(offerId);
+
+ // The small risk of inconsistency is acceptable here - if we have an accept/remove race
+ // on an offer, the master will mark the task as LOST and it will be retried.
+ return hostOffers.remove(offerId);
+ }
+
+ @Override
+ public void ban(Protos.OfferID offerId) {
+ hostOffers.addGlobalBan(offerId);
+ }
+
+ /**
+ * Updates the preference of a host's offers.
+ *
+ * @param change Host change notification.
+ */
+ @Subscribe
+ public void hostAttributesChanged(PubsubEvent.HostAttributesChanged change) {
+ hostOffers.updateHostAttributes(change.getAttributes());
+ }
+
+ @Override
+ public Optional<HostOffer> get(Protos.AgentID slaveId) {
+ return hostOffers.get(slaveId);
+ }
+
+ @Override
+ public Iterable<HostOffer> getAll() {
+ return hostOffers.getOffers();
+ }
+
+ @Override
+ public Optional<HostOffer> getMatching(Protos.AgentID slaveId,
+ ResourceRequest resourceRequest,
+ boolean revocable) {
+
+ return hostOffers.getMatching(slaveId, resourceRequest, revocable);
+ }
+
+ @Override
+ public Iterable<HostOffer> getAllMatching(TaskGroupKey groupKey,
+ ResourceRequest resourceRequest,
+ boolean revocable) {
+
+ return hostOffers.getAllMatching(groupKey, resourceRequest, revocable);
+ }
+
+ /**
+ * Notifies the queue that the driver is disconnected, and all the stored offers are now
+ * invalid.
+ * <p>
+ * The queue takes this as a signal to flush its queue.
+ *
+ * @param event Disconnected event.
+ */
+ @Subscribe
+ public void driverDisconnected(PubsubEvent.DriverDisconnected event) {
+ LOG.info("Clearing stale offers since the driver is disconnected.");
+ hostOffers.clear();
+ }
+
+ @Timed("offer_manager_launch_task")
+ @Override
+ public void launchTask(Protos.OfferID offerId, Protos.TaskInfo task) throws LaunchException {
+ // Guard against an offer being removed after we grabbed it from the iterator.
+ // If that happens, the offer will not exist in hostOffers, and we can immediately
+ // send it back to LOST for quick reschedule.
+ // Removing while iterating counts on the use of a weakly-consistent iterator being used,
+ // which is a feature of ConcurrentSkipListSet.
+ if (hostOffers.remove(offerId)) {
+ try {
+ Protos.Offer.Operation launch = Protos.Offer.Operation.newBuilder()
+ .setType(Protos.Offer.Operation.Type.LAUNCH)
+ .setLaunch(Protos.Offer.Operation.Launch.newBuilder().addTaskInfos(task))
+ .build();
+ driver.acceptOffers(offerId, ImmutableList.of(launch), getOfferFilter());
+ } catch (IllegalStateException e) {
+ // TODO(William Farner): Catch only the checked exception produced by Driver
+ // once it changes from throwing IllegalStateException when the driver is not yet
+ // registered.
+ throw new LaunchException("Failed to launch task.", e);
+ }
+ } else {
+ offerRaces.incrementAndGet();
+ throw new LaunchException("Offer no longer exists in offer queue, likely data race.");
+ }
+ }
+
+ /**
+ * Get all static bans.
+ */
+ @VisibleForTesting
+ Set<Pair<Protos.OfferID, TaskGroupKey>> getStaticBans() {
+ return hostOffers.getStaticBans();
+ }
+
+ /**
+ * Exclude an offer that results in a static mismatch from further attempts to match against all
+ * tasks from the same group.
+ */
+ @VisibleForTesting
+ void banForTaskGroup(Protos.OfferID offerId, TaskGroupKey groupKey) {
+ hostOffers.addStaticGroupBan(offerId, groupKey);
+ }
+
+ /**
+ * Used for testing to ensure that the underlying cache's `size` method returns an accurate
+ * value by not including evicted entries.
+ */
+ @VisibleForTesting
+ void cleanupStaticBans() {
+ hostOffers.cleanUpStaticallyBannedOffers();
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerModule.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerModule.java
new file mode 100644
index 0000000..e2e3628
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManagerModule.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.offers;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.List;
+
+import javax.inject.Qualifier;
+import javax.inject.Singleton;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Supplier;
+import com.google.common.base.Ticker;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Ordering;
+import com.google.inject.AbstractModule;
+import com.google.inject.Module;
+import com.google.inject.PrivateModule;
+import com.google.inject.Provides;
+import com.google.inject.TypeLiteral;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.util.Random;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.app.MoreModules;
+import org.apache.aurora.scheduler.config.CliOptions;
+import org.apache.aurora.scheduler.config.types.TimeAmount;
+import org.apache.aurora.scheduler.config.validators.NotNegativeAmount;
+import org.apache.aurora.scheduler.config.validators.NotNegativeNumber;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Binding module for resource offer management.
+ */
+public class OfferManagerModule extends AbstractModule {
+ private static final Logger LOG = LoggerFactory.getLogger(OfferManagerModule.class);
+
+ @Parameters(separators = "=")
+ public static class Options {
+ @Parameter(names = "-hold_offers_forever",
+ description =
+ "Hold resource offers indefinitely, disabling automatic offer decline settings.",
+ arity = 1)
+ public boolean holdOffersForever = false;
+
+ @Parameter(names = "-min_offer_hold_time",
+ validateValueWith = NotNegativeAmount.class,
+ description = "Minimum amount of time to hold a resource offer before declining.")
+ public TimeAmount minOfferHoldTime = new TimeAmount(5, Time.MINUTES);
+
+ @Parameter(names = "-offer_hold_jitter_window",
+ validateValueWith = NotNegativeAmount.class,
+ description = "Maximum amount of random jitter to add to the offer hold time window.")
+ public TimeAmount offerHoldJitterWindow = new TimeAmount(1, Time.MINUTES);
+
+ @Parameter(names = "-offer_filter_duration",
+ description =
+ "Duration after which we expect Mesos to re-offer unused resources. A short duration "
+ + "improves scheduling performance in smaller clusters, but might lead to resource "
+ + "starvation for other frameworks if you run many frameworks in your cluster.")
+ public TimeAmount offerFilterDuration = new TimeAmount(5, Time.SECONDS);
+
+ @Parameter(names = "-unavailability_threshold",
+ description =
+ "Threshold time, when running tasks should be drained from a host, before a host "
+ + "becomes unavailable. Should be greater than min_offer_hold_time + "
+ + "offer_hold_jitter_window.")
+ public TimeAmount unavailabilityThreshold = new TimeAmount(6, Time.MINUTES);
+
+ @Parameter(names = "-offer_order",
+ description =
+ "Iteration order for offers, to influence task scheduling. Multiple orderings will be "
+ + "compounded together. E.g. CPU,MEMORY,RANDOM would sort first by cpus offered,"
+ + " then memory and finally would randomize any equal offers.")
+ public List<OfferOrder> offerOrder = ImmutableList.of(OfferOrder.RANDOM);
+
+ @Parameter(names = "-offer_order_modules",
+ description = "Custom Guice module to provide an offer ordering.")
+ @SuppressWarnings("rawtypes")
+ public List<Class> offerOrderModules = ImmutableList.of(OfferOrderModule.class);
+
+ @Parameter(names = "-offer_static_ban_cache_max_size",
+ validateValueWith = NotNegativeNumber.class,
+ description =
+ "The number of offers to hold in the static ban cache. If no value is specified, "
+ + "the cache will grow indefinitely. However, entries will expire within "
+ + "'min_offer_hold_time' + 'offer_hold_jitter_window' of being written.")
+ public long offerStaticBanCacheMaxSize = Long.MAX_VALUE;
+ }
+
+ /**
+ * Binding annotation for the threshold to veto tasks with unavailability.
+ */
+ @Qualifier
+ @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+ public @interface UnavailabilityThreshold { }
+
+ public static class OfferOrderModule extends AbstractModule {
+ private final CliOptions options;
+
+ public OfferOrderModule(CliOptions options) {
+ this.options = options;
+ }
+
+ @Override
+ protected void configure() {
+ bind(new TypeLiteral<Ordering<HostOffer>>() { })
+ .toInstance(OfferOrderBuilder.create(options.offer.offerOrder));
+ }
+ }
+
+ private final CliOptions cliOptions;
+
+ public OfferManagerModule(CliOptions cliOptions) {
+ this.cliOptions = cliOptions;
+ }
+
+ @Override
+ protected void configure() {
+ Options options = cliOptions.offer;
+ if (!options.holdOffersForever) {
+ long offerHoldTime = options.offerHoldJitterWindow.as(Time.SECONDS)
+ + options.minOfferHoldTime.as(Time.SECONDS);
+ if (options.unavailabilityThreshold.as(Time.SECONDS) < offerHoldTime) {
+ LOG.warn("unavailability_threshold ({}) is less than the sum of min_offer_hold_time ({})"
+ + " and offer_hold_jitter_window ({}). This creates risks of races between "
+ + "launching and draining",
+ options.unavailabilityThreshold,
+ options.minOfferHoldTime,
+ options.offerHoldJitterWindow);
+ }
+ }
+
+ for (Module module: MoreModules.instantiateAll(options.offerOrderModules, cliOptions)) {
+ install(module);
+ }
+
+ bind(new TypeLiteral<Amount<Long, Time>>() { })
+ .annotatedWith(UnavailabilityThreshold.class)
+ .toInstance(options.unavailabilityThreshold);
+
+ install(new PrivateModule() {
+ @Override
+ protected void configure() {
+ if (options.holdOffersForever) {
+ bind(Deferment.class).to(Deferment.Noop.class);
+ } else {
+ bind(new TypeLiteral<Supplier<Amount<Long, Time>>>() { }).toInstance(
+ new RandomJitterReturnDelay(
+ options.minOfferHoldTime.as(Time.MILLISECONDS),
+ options.offerHoldJitterWindow.as(Time.MILLISECONDS),
+ Random.Util.newDefaultRandom()));
+ bind(Deferment.class).to(Deferment.DelayedDeferment.class);
+ }
+
+ bind(OfferManager.class).to(OfferManagerImpl.class);
+ bind(OfferManagerImpl.class).in(Singleton.class);
+ expose(OfferManager.class);
+ }
+ });
+ PubsubEventModule.bindSubscriber(binder(), OfferManager.class);
+ }
+
+ @Provides
+ @Singleton
+ OfferSettings provideOfferSettings(Ordering<HostOffer> offerOrdering) {
+ // We have a dual eviction strategy for the static ban cache in OfferManager that is based on
+ // both maximum size of the cache and the length an offer is valid. We do this in order to
+ // satisfy requirements in both single- and multi-framework environments. If offers are held for
+ // a finite duration, then we can expire cache entries after offerMaxHoldTime since that is the
+ // longest it will be valid for. Additionally, cluster operators will most likely not have to
+ // worry about cache size in this case as this behavior mimics current behavior. If offers are
+ // held indefinitely, then we never expire cache entries but the cluster operator can specify a
+ // maximum size to avoid a memory leak.
+ long maxOfferHoldTime;
+ if (cliOptions.offer.holdOffersForever) {
+ maxOfferHoldTime = Long.MAX_VALUE;
+ } else {
+ maxOfferHoldTime = cliOptions.offer.minOfferHoldTime.as(Time.SECONDS)
+ + cliOptions.offer.offerHoldJitterWindow.as(Time.SECONDS);
+ }
+
+ return new OfferSettings(
+ cliOptions.offer.offerFilterDuration,
+ offerOrdering,
+ Amount.of(maxOfferHoldTime, Time.SECONDS),
+ cliOptions.offer.offerStaticBanCacheMaxSize,
+ Ticker.systemTicker());
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
index 57fc1a1..838a319 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
@@ -29,6 +29,7 @@ import static java.util.Objects.requireNonNull;
/**
* Settings required to create an OfferManager.
*/
+@VisibleForTesting
public class OfferSettings {
private final Amount<Long, Time> filterDuration;
@@ -67,14 +68,14 @@ public class OfferSettings {
/**
* Duration after which we want Mesos to re-offer unused or declined resources.
*/
- public Amount<Long, Time> getFilterDuration() {
+ Amount<Long, Time> getFilterDuration() {
return filterDuration;
}
/**
* The ordering to use when fetching offers from OfferManager.
*/
- public Ordering<HostOffer> getOrdering() {
+ Ordering<HostOffer> getOrdering() {
return ordering;
}
@@ -82,7 +83,7 @@ public class OfferSettings {
* The builder for the static ban cache. Cache settings (e.g. max size, entry expiration) should
* already be added to the builder by this point.
*/
- public CacheBuilder<Object, Object> getStaticBanCacheBuilder() {
+ CacheBuilder<Object, Object> getStaticBanCacheBuilder() {
return staticBanCacheBuilder;
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java b/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
deleted file mode 100644
index 4a6ea8d..0000000
--- a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.offers;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-import java.util.List;
-
-import javax.inject.Qualifier;
-import javax.inject.Singleton;
-
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import com.google.common.base.Supplier;
-import com.google.common.base.Ticker;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Ordering;
-import com.google.inject.AbstractModule;
-import com.google.inject.Module;
-import com.google.inject.PrivateModule;
-import com.google.inject.Provides;
-import com.google.inject.TypeLiteral;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.util.Random;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.app.MoreModules;
-import org.apache.aurora.scheduler.config.CliOptions;
-import org.apache.aurora.scheduler.config.types.TimeAmount;
-import org.apache.aurora.scheduler.config.validators.NotNegativeAmount;
-import org.apache.aurora.scheduler.config.validators.NotNegativeNumber;
-import org.apache.aurora.scheduler.events.PubsubEventModule;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-/**
- * Binding module for resource offer management.
- */
-public class OffersModule extends AbstractModule {
- private static final Logger LOG = LoggerFactory.getLogger(OffersModule.class);
-
- @Parameters(separators = "=")
- public static class Options {
- @Parameter(names = "-hold_offers_forever",
- description =
- "Hold resource offers indefinitely, disabling automatic offer decline settings.",
- arity = 1)
- public boolean holdOffersForever = false;
-
- @Parameter(names = "-min_offer_hold_time",
- validateValueWith = NotNegativeAmount.class,
- description = "Minimum amount of time to hold a resource offer before declining.")
- public TimeAmount minOfferHoldTime = new TimeAmount(5, Time.MINUTES);
-
- @Parameter(names = "-offer_hold_jitter_window",
- validateValueWith = NotNegativeAmount.class,
- description = "Maximum amount of random jitter to add to the offer hold time window.")
- public TimeAmount offerHoldJitterWindow = new TimeAmount(1, Time.MINUTES);
-
- @Parameter(names = "-offer_filter_duration",
- description =
- "Duration after which we expect Mesos to re-offer unused resources. A short duration "
- + "improves scheduling performance in smaller clusters, but might lead to resource "
- + "starvation for other frameworks if you run many frameworks in your cluster.")
- public TimeAmount offerFilterDuration = new TimeAmount(5, Time.SECONDS);
-
- @Parameter(names = "-unavailability_threshold",
- description =
- "Threshold time, when running tasks should be drained from a host, before a host "
- + "becomes unavailable. Should be greater than min_offer_hold_time + "
- + "offer_hold_jitter_window.")
- public TimeAmount unavailabilityThreshold = new TimeAmount(6, Time.MINUTES);
-
- @Parameter(names = "-offer_order",
- description =
- "Iteration order for offers, to influence task scheduling. Multiple orderings will be "
- + "compounded together. E.g. CPU,MEMORY,RANDOM would sort first by cpus offered,"
- + " then memory and finally would randomize any equal offers.")
- public List<OfferOrder> offerOrder = ImmutableList.of(OfferOrder.RANDOM);
-
- @Parameter(names = "-offer_order_modules",
- description = "Custom Guice module to provide an offer ordering.")
- @SuppressWarnings("rawtypes")
- public List<Class> offerOrderModules = ImmutableList.of(OfferOrderModule.class);
-
- @Parameter(names = "-offer_static_ban_cache_max_size",
- validateValueWith = NotNegativeNumber.class,
- description =
- "The number of offers to hold in the static ban cache. If no value is specified, "
- + "the cache will grow indefinitely. However, entries will expire within "
- + "'min_offer_hold_time' + 'offer_hold_jitter_window' of being written.")
- public long offerStaticBanCacheMaxSize = Long.MAX_VALUE;
- }
-
- public static class OfferOrderModule extends AbstractModule {
- private final CliOptions options;
-
- public OfferOrderModule(CliOptions options) {
- this.options = options;
- }
-
- @Override
- protected void configure() {
- bind(new TypeLiteral<Ordering<HostOffer>>() { })
- .toInstance(OfferOrderBuilder.create(options.offer.offerOrder));
- }
- }
-
- /**
- * Binding annotation for the threshold to veto tasks with unavailability.
- */
- @Qualifier
- @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
- public @interface UnavailabilityThreshold { }
-
- private final CliOptions cliOptions;
-
- public OffersModule(CliOptions cliOptions) {
- this.cliOptions = cliOptions;
- }
-
- @Override
- protected void configure() {
- Options options = cliOptions.offer;
- if (!options.holdOffersForever) {
- long offerHoldTime = options.offerHoldJitterWindow.as(Time.SECONDS)
- + options.minOfferHoldTime.as(Time.SECONDS);
- if (options.unavailabilityThreshold.as(Time.SECONDS) < offerHoldTime) {
- LOG.warn("unavailability_threshold ({}) is less than the sum of min_offer_hold_time ({})"
- + " and offer_hold_jitter_window ({}). This creates risks of races between "
- + "launching and draining",
- options.unavailabilityThreshold,
- options.minOfferHoldTime,
- options.offerHoldJitterWindow);
- }
- }
-
- for (Module module: MoreModules.instantiateAll(options.offerOrderModules, cliOptions)) {
- install(module);
- }
-
- bind(new TypeLiteral<Amount<Long, Time>>() { })
- .annotatedWith(UnavailabilityThreshold.class)
- .toInstance(options.unavailabilityThreshold);
-
- install(new PrivateModule() {
- @Override
- protected void configure() {
- if (options.holdOffersForever) {
- bind(Deferment.class).to(Deferment.Noop.class);
- } else {
- bind(new TypeLiteral<Supplier<Amount<Long, Time>>>() { }).toInstance(
- new RandomJitterReturnDelay(
- options.minOfferHoldTime.as(Time.MILLISECONDS),
- options.offerHoldJitterWindow.as(Time.MILLISECONDS),
- Random.Util.newDefaultRandom()));
- bind(Deferment.class).to(Deferment.DelayedDeferment.class);
- }
-
- bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
- bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
- expose(OfferManager.class);
- }
- });
- PubsubEventModule.bindSubscriber(binder(), OfferManager.class);
- }
-
- @Provides
- @Singleton
- OfferSettings provideOfferSettings(Ordering<HostOffer> offerOrdering) {
- // We have a dual eviction strategy for the static ban cache in OfferManager that is based on
- // both maximum size of the cache and the length an offer is valid. We do this in order to
- // satisfy requirements in both single- and multi-framework environments. If offers are held for
- // a finite duration, then we can expire cache entries after offerMaxHoldTime since that is the
- // longest it will be valid for. Additionally, cluster operators will most likely not have to
- // worry about cache size in this case as this behavior mimics current behavior. If offers are
- // held indefinitely, then we never expire cache entries but the cluster operator can specify a
- // maximum size to avoid a memory leak.
- long maxOfferHoldTime;
- if (cliOptions.offer.holdOffersForever) {
- maxOfferHoldTime = Long.MAX_VALUE;
- } else {
- maxOfferHoldTime = cliOptions.offer.minOfferHoldTime.as(Time.SECONDS)
- + cliOptions.offer.offerHoldJitterWindow.as(Time.SECONDS);
- }
-
- return new OfferSettings(
- cliOptions.offer.offerFilterDuration,
- offerOrdering,
- Amount.of(maxOfferHoldTime, Time.SECONDS),
- cliOptions.offer.offerStaticBanCacheMaxSize,
- Ticker.systemTicker());
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
index 766d3b2..497a766 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
@@ -143,7 +143,7 @@ public class PendingTaskProcessor implements Runnable {
// Group the offers by slave id so they can be paired with active tasks from the same slave.
Map<String, HostOffer> slavesToOffers =
- Maps.uniqueIndex(offerManager.getOffers(), OFFER_TO_SLAVE_ID);
+ Maps.uniqueIndex(offerManager.getAll(), OFFER_TO_SLAVE_ID);
Set<String> allSlaves = Sets.newHashSet(Iterables.concat(
slavesToOffers.keySet(),
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
index 82a0ff6..ffb8b90 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
@@ -94,7 +94,7 @@ public interface Preemptor {
pendingTask.getTask(),
slot.getVictims(),
jobState,
- offerManager.getOffer(slaveId),
+ offerManager.get(slaveId),
store);
metrics.recordSlotValidationResult(validatedVictims);