You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/11/13 01:10:27 UTC
[1/2] incubator-aurora git commit: Store host attributes alongside
offers to reduce number of lookups.
Repository: incubator-aurora
Updated Branches:
refs/heads/master 2e2c4b37f -> b80e69c9b
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
index 7cf5d3e..4170902 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerTest.java
@@ -33,6 +33,7 @@ import com.twitter.common.util.BackoffStrategy;
import com.twitter.common.util.testing.FakeClock;
import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Attribute;
import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.Identity;
import org.apache.aurora.gen.JobKey;
@@ -40,6 +41,7 @@ import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
@@ -62,7 +64,6 @@ import org.apache.aurora.scheduler.storage.TaskStore;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.mem.MemStorage;
-import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.OfferID;
import org.apache.mesos.Protos.SlaveID;
import org.apache.mesos.Protos.TaskID;
@@ -72,13 +73,17 @@ import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
+import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
+import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
+import static org.apache.aurora.gen.MaintenanceMode.NONE;
+import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED;
import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
import static org.apache.aurora.gen.ScheduleStatus.INIT;
import static org.apache.aurora.gen.ScheduleStatus.KILLED;
import static org.apache.aurora.gen.ScheduleStatus.LOST;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
+import static org.apache.mesos.Protos.Offer;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
@@ -93,14 +98,10 @@ public class TaskSchedulerTest extends EasyMockTest {
private static final long FIRST_SCHEDULE_DELAY_MS = 1L;
- private static final HostOffer OFFER_A =
- new HostOffer(Offers.makeOffer("OFFER_A", "HOST_A"), MaintenanceMode.NONE);
- private static final HostOffer OFFER_B =
- new HostOffer(Offers.makeOffer("OFFER_B", "HOST_B"), MaintenanceMode.SCHEDULED);
- private static final HostOffer OFFER_C =
- new HostOffer(Offers.makeOffer("OFFER_C", "HOST_C"), MaintenanceMode.DRAINING);
- private static final HostOffer OFFER_D =
- new HostOffer(Offers.makeOffer("OFFER_D", "HOST_D"), MaintenanceMode.DRAINED);
+ private static final HostOffer OFFER_A = makeOffer("OFFER_A", "HOST_A", NONE);
+ private static final HostOffer OFFER_B = makeOffer("OFFER_B", "HOST_B", SCHEDULED);
+ private static final HostOffer OFFER_C = makeOffer("OFFER_C", "HOST_C", DRAINING);
+ private static final HostOffer OFFER_D = makeOffer("OFFER_D", "HOST_D", DRAINED);
private Storage storage;
@@ -150,7 +151,7 @@ public class TaskSchedulerTest extends EasyMockTest {
capture(cacheSizeSupplier))).andReturn(stat);
control.replay();
- offerQueue = new OfferQueueImpl(driver, returnDelay, executor, maintenance);
+ offerQueue = new OfferQueueImpl(driver, returnDelay, executor);
TaskScheduler scheduler = new TaskSchedulerImpl(storage,
stateManager,
assigner,
@@ -173,10 +174,10 @@ public class TaskSchedulerTest extends EasyMockTest {
return expectOfferDeclineIn(10);
}
- private Capture<Runnable> expectOfferDeclineIn(int delayMillis) {
+ private Capture<Runnable> expectOfferDeclineIn(long delayMillis) {
expect(returnDelay.get()).andReturn(Amount.of(delayMillis, Time.MILLISECONDS));
Capture<Runnable> runnable = createCapture();
- executor.schedule(capture(runnable), eq((long) delayMillis), eq(TimeUnit.MILLISECONDS));
+ executor.schedule(capture(runnable), eq(delayMillis), eq(TimeUnit.MILLISECONDS));
expectLastCall().andReturn(createMock(ScheduledFuture.class));
return runnable;
}
@@ -223,8 +224,8 @@ public class TaskSchedulerTest extends EasyMockTest {
replayAndCreateScheduler();
- offerQueue.addOffer(OFFER_A.getOffer());
- offerQueue.addOffer(OFFER_B.getOffer());
+ offerQueue.addOffer(OFFER_A);
+ offerQueue.addOffer(OFFER_B);
}
@Test
@@ -310,7 +311,7 @@ public class TaskSchedulerTest extends EasyMockTest {
replayAndCreateScheduler();
- offerQueue.addOffer(OFFER_A.getOffer());
+ offerQueue.addOffer(OFFER_A);
changeState(task, INIT, PENDING);
timeoutCapture.getValue().run();
timeoutCapture2.getValue().run();
@@ -345,7 +346,7 @@ public class TaskSchedulerTest extends EasyMockTest {
replayAndCreateScheduler();
changeState(task, INIT, PENDING);
- offerQueue.addOffer(OFFER_A.getOffer());
+ offerQueue.addOffer(OFFER_A);
timeoutCapture.getValue().run();
}
@@ -372,7 +373,7 @@ public class TaskSchedulerTest extends EasyMockTest {
replayAndCreateScheduler();
changeState(task, INIT, PENDING);
- offerQueue.addOffer(OFFER_A.getOffer());
+ offerQueue.addOffer(OFFER_A);
timeoutCapture.getValue().run();
timeoutCapture2.getValue().run();
}
@@ -394,7 +395,7 @@ public class TaskSchedulerTest extends EasyMockTest {
replayAndCreateScheduler();
changeState(task, INIT, PENDING);
- offerQueue.addOffer(OFFER_A.getOffer());
+ offerQueue.addOffer(OFFER_A);
timeoutCapture.getValue().run();
offerExpirationCapture.getValue().run();
timeoutCapture2.getValue().run();
@@ -405,15 +406,16 @@ public class TaskSchedulerTest extends EasyMockTest {
expectAnyMaintenanceCalls();
Capture<Runnable> offerExpirationCapture = expectOfferDeclineIn(10);
- Offer offerAB =
- Offers.makeOffer("OFFER_B").toBuilder().setSlaveId(OFFER_A.getOffer().getSlaveId()).build();
+ HostOffer offerAB = new HostOffer(
+ Offers.makeOffer("OFFER_B").toBuilder().setSlaveId(OFFER_A.getOffer().getSlaveId()).build(),
+ IHostAttributes.build(new HostAttributes()));
driver.declineOffer(OFFER_A.getOffer().getId());
- driver.declineOffer(offerAB.getId());
+ driver.declineOffer(offerAB.getOffer().getId());
replayAndCreateScheduler();
- offerQueue.addOffer(OFFER_A.getOffer());
+ offerQueue.addOffer(OFFER_A);
offerQueue.addOffer(offerAB);
offerExpirationCapture.getValue().run();
}
@@ -431,7 +433,7 @@ public class TaskSchedulerTest extends EasyMockTest {
replayAndCreateScheduler();
- offerQueue.addOffer(OFFER_A.getOffer());
+ offerQueue.addOffer(OFFER_A);
offerQueue.launchFirst(offerAcceptor);
offerExpirationCapture.getValue().run();
}
@@ -439,13 +441,9 @@ public class TaskSchedulerTest extends EasyMockTest {
@Test
public void testBasicMaintenancePreferences() {
expectOffer();
- expect(maintenance.getMode("HOST_D")).andReturn(OFFER_D.getMode());
expectOffer();
- expect(maintenance.getMode("HOST_C")).andReturn(OFFER_C.getMode());
expectOffer();
- expect(maintenance.getMode("HOST_B")).andReturn(OFFER_B.getMode());
expectOffer();
- expect(maintenance.getMode("HOST_A")).andReturn(OFFER_A.getMode());
IScheduledTask taskA = makeTask("A", PENDING);
TaskInfo mesosTaskA = makeTaskInfo(taskA);
@@ -461,10 +459,10 @@ public class TaskSchedulerTest extends EasyMockTest {
replayAndCreateScheduler();
- offerQueue.addOffer(OFFER_D.getOffer());
- offerQueue.addOffer(OFFER_C.getOffer());
- offerQueue.addOffer(OFFER_B.getOffer());
- offerQueue.addOffer(OFFER_A.getOffer());
+ offerQueue.addOffer(OFFER_D);
+ offerQueue.addOffer(OFFER_C);
+ offerQueue.addOffer(OFFER_B);
+ offerQueue.addOffer(OFFER_A);
changeState(taskA, INIT, PENDING);
captureA.getValue().run();
@@ -476,11 +474,8 @@ public class TaskSchedulerTest extends EasyMockTest {
@Test
public void testChangingMaintenancePreferences() {
expectOffer();
- expect(maintenance.getMode("HOST_A")).andReturn(OFFER_A.getMode());
expectOffer();
- expect(maintenance.getMode("HOST_B")).andReturn(OFFER_B.getMode());
expectOffer();
- expect(maintenance.getMode("HOST_C")).andReturn(OFFER_C.getMode());
IScheduledTask taskA = makeTask("A", PENDING);
TaskInfo mesosTaskA = makeTaskInfo(taskA);
@@ -490,27 +485,28 @@ public class TaskSchedulerTest extends EasyMockTest {
IScheduledTask taskB = makeTask("B", PENDING);
TaskInfo mesosTaskB = makeTaskInfo(taskB);
- HostOffer updatedOfferC =
- new HostOffer(OFFER_C.getOffer(), MaintenanceMode.NONE);
+ HostOffer updatedOfferC = new HostOffer(
+ OFFER_C.getOffer(),
+ IHostAttributes.build(OFFER_C.getAttributes().newBuilder().setMode(NONE)));
expect(assigner.maybeAssign(updatedOfferC, taskB, emptyJob)).andReturn(Optional.of(mesosTaskB));
driver.launchTask(OFFER_C.getOffer().getId(), mesosTaskB);
Capture<Runnable> captureB = expectTaskRetryIn(FIRST_SCHEDULE_DELAY_MS);
replayAndCreateScheduler();
- offerQueue.addOffer(OFFER_A.getOffer());
- offerQueue.addOffer(OFFER_B.getOffer());
- offerQueue.addOffer(OFFER_C.getOffer());
+ offerQueue.addOffer(OFFER_A);
+ offerQueue.addOffer(OFFER_B);
+ offerQueue.addOffer(OFFER_C);
// Initially, we'd expect the offers to be consumed in order (A, B), with (C) unschedulable
// Expected order now (B), with (C, A) unschedulable
- changeHostMaintenanceState("HOST_A", MaintenanceMode.DRAINING);
+ changeHostMaintenanceState(OFFER_A.getAttributes(), DRAINING);
changeState(taskA, INIT, PENDING);
captureA.getValue().run();
// Expected order now (C), with (A) unschedulable and (B) already consumed
- changeHostMaintenanceState("HOST_C", MaintenanceMode.NONE);
+ changeHostMaintenanceState(OFFER_C.getAttributes(), NONE);
changeState(taskB, INIT, PENDING);
captureB.getValue().run();
}
@@ -564,10 +560,10 @@ public class TaskSchedulerTest extends EasyMockTest {
replayAndCreateScheduler();
- offerQueue.addOffer(OFFER_A.getOffer());
- offerQueue.addOffer(OFFER_B.getOffer());
- offerQueue.addOffer(OFFER_C.getOffer());
- offerQueue.addOffer(OFFER_D.getOffer());
+ offerQueue.addOffer(OFFER_A);
+ offerQueue.addOffer(OFFER_B);
+ offerQueue.addOffer(OFFER_C);
+ offerQueue.addOffer(OFFER_D);
changeState(jobA0, INIT, PENDING);
changeState(jobA1, INIT, PENDING);
changeState(jobA2, INIT, PENDING);
@@ -593,7 +589,7 @@ public class TaskSchedulerTest extends EasyMockTest {
replayAndCreateScheduler();
- offerQueue.addOffer(OFFER_A.getOffer());
+ offerQueue.addOffer(OFFER_A);
changeState(task, INIT, PENDING);
timeoutCapture.getValue().run();
@@ -618,11 +614,22 @@ public class TaskSchedulerTest extends EasyMockTest {
}
private void expectAnyMaintenanceCalls() {
- expect(maintenance.getMode(isA(String.class))).andReturn(MaintenanceMode.NONE).anyTimes();
+ expect(maintenance.getMode(isA(String.class))).andReturn(NONE).anyTimes();
}
- private void changeHostMaintenanceState(String hostName, MaintenanceMode mode) {
+ private void changeHostMaintenanceState(IHostAttributes attributes, MaintenanceMode mode) {
offerQueue.hostAttributesChanged(new PubsubEvent.HostAttributesChanged(
- IHostAttributes.build(new HostAttributes().setHost(hostName).setMode(mode))));
+ IHostAttributes.build(attributes.newBuilder().setMode(mode))));
+ }
+
+ private static HostOffer makeOffer(String offerId, String hostName, MaintenanceMode mode) {
+ Offer offer = Offers.makeOffer(offerId, hostName);
+ return new HostOffer(
+ offer,
+ IHostAttributes.build(new HostAttributes()
+ .setHost(hostName)
+ .setSlaveId(offer.getSlaveId().getValue())
+ .setAttributes(ImmutableSet.<Attribute>of())
+ .setMode(mode)));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
index 0318179..94f0a17 100644
--- a/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilterTest.java
@@ -19,6 +19,7 @@ import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableSet;
import com.twitter.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.ResourceSlot;
@@ -27,6 +28,7 @@ import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.junit.Before;
@@ -43,8 +45,8 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
.setDiskMb(1024));
private static final ResourceSlot TASK_RESOURCES = ResourceSlot.from(TASK);
private static final String TASK_ID = "taskId";
- private static final String SLAVE = "slaveHost";
- private static final MaintenanceMode MODE = MaintenanceMode.NONE;
+ private static final IHostAttributes ATTRIBUTES =
+ IHostAttributes.build(new HostAttributes().setHost("host").setMode(MaintenanceMode.NONE));
private static final Veto VETO_1 = new Veto("veto1", 1);
private static final Veto VETO_2 = new Veto("veto2", 2);
@@ -67,21 +69,21 @@ public class NotifyingSchedulingFilterTest extends EasyMockTest {
@Test
public void testEvents() {
Set<Veto> vetoes = ImmutableSet.of(VETO_1, VETO_2);
- expect(delegate.filter(TASK_RESOURCES, SLAVE, MODE, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
+ expect(delegate.filter(TASK_RESOURCES, ATTRIBUTES, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
eventSink.post(new Vetoed(TASK_ID, vetoes));
control.replay();
- assertEquals(vetoes, filter.filter(TASK_RESOURCES, SLAVE, MODE, TASK, TASK_ID, emptyJob));
+ assertEquals(vetoes, filter.filter(TASK_RESOURCES, ATTRIBUTES, TASK, TASK_ID, emptyJob));
}
@Test
public void testNoVetoes() {
Set<Veto> vetoes = ImmutableSet.of();
- expect(delegate.filter(TASK_RESOURCES, SLAVE, MODE, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
+ expect(delegate.filter(TASK_RESOURCES, ATTRIBUTES, TASK, TASK_ID, emptyJob)).andReturn(vetoes);
control.replay();
- assertEquals(vetoes, filter.filter(TASK_RESOURCES, SLAVE, MODE, TASK, TASK_ID, emptyJob));
+ assertEquals(vetoes, filter.filter(TASK_RESOURCES, ATTRIBUTES, TASK, TASK_ID, emptyJob));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
index bffbf83..e113eba 100644
--- a/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/filter/SchedulingFilterImplTest.java
@@ -98,7 +98,6 @@ public class SchedulingFilterImplTest extends EasyMockTest {
Amount.of(DEFAULT_RAM, Data.MB),
Amount.of(DEFAULT_DISK, Data.MB),
0);
- private static final MaintenanceMode DEFAULT_MODE = MaintenanceMode.NONE;
private AttributeAggregate emptyJob;
@@ -110,9 +109,9 @@ public class SchedulingFilterImplTest extends EasyMockTest {
private AttributeStore.Mutable attributeStore;
@Before
- public void setUp() throws Exception {
+ public void setUp() {
storage = createMock(Storage.class);
- defaultFilter = new SchedulingFilterImpl(storage);
+ defaultFilter = new SchedulingFilterImpl();
storeProvider = createMock(StoreProvider.class);
attributeStore = createMock(AttributeStore.Mutable.class);
emptyJob = new AttributeAggregate(
@@ -125,11 +124,11 @@ public class SchedulingFilterImplTest extends EasyMockTest {
expect(storeProvider.getAttributeStore()).andReturn(attributeStore).anyTimes();
}
- private void expectReads() throws Exception {
+ private void expectReads() {
expect(storage.weaklyConsistentRead(EasyMock.<Quiet<Object>>anyObject()))
.andAnswer(new IAnswer<Object>() {
@Override
- public Object answer() throws Exception {
+ public Object answer() {
Quiet<?> arg = (Quiet<?>) EasyMock.getCurrentArguments()[0];
return arg.apply(storeProvider);
}
@@ -137,19 +136,18 @@ public class SchedulingFilterImplTest extends EasyMockTest {
}
@Test
- public void testMeetsOffer() throws Exception {
- expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-
+ public void testMeetsOffer() {
control.replay();
- assertNoVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK));
- assertNoVetoes(makeTask(DEFAULT_CPUS - 1, DEFAULT_RAM - 1, DEFAULT_DISK - 1), emptyJob);
+ IHostAttributes attributes = hostAttributes(HOST_A, host(HOST_A), rack(RACK_A));
+ assertNoVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK), attributes);
+ assertNoVetoes(
+ makeTask(DEFAULT_CPUS - 1, DEFAULT_RAM - 1, DEFAULT_DISK - 1),
+ attributes);
}
@Test
- public void testSufficientPorts() throws Exception {
- expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-
+ public void testSufficientPorts() {
control.replay();
ResourceSlot twoPorts = ResourceSlot.from(
@@ -172,115 +170,111 @@ public class SchedulingFilterImplTest extends EasyMockTest {
.setRequestedPorts(ImmutableSet.of("one", "two", "three")));
Set<Veto> none = ImmutableSet.of();
+ IHostAttributes hostA = hostAttributes(HOST_A, host(HOST_A), rack(RACK_A));
assertEquals(none,
- defaultFilter.filter(twoPorts, HOST_A, DEFAULT_MODE, noPortTask, TASK_ID, emptyJob));
+ defaultFilter.filter(twoPorts, hostA, noPortTask, TASK_ID, emptyJob));
assertEquals(none,
- defaultFilter.filter(twoPorts, HOST_A, DEFAULT_MODE, onePortTask, TASK_ID, emptyJob));
+ defaultFilter.filter(twoPorts, hostA, onePortTask, TASK_ID, emptyJob));
assertEquals(none,
- defaultFilter.filter(twoPorts, HOST_A, DEFAULT_MODE, twoPortTask, TASK_ID, emptyJob));
+ defaultFilter.filter(twoPorts, hostA, twoPortTask, TASK_ID, emptyJob));
assertEquals(
ImmutableSet.of(PORTS.veto(1)),
- defaultFilter.filter(twoPorts, HOST_A, DEFAULT_MODE, threePortTask, TASK_ID, emptyJob));
+ defaultFilter.filter(twoPorts, hostA, threePortTask, TASK_ID, emptyJob));
}
@Test
- public void testInsufficientResources() throws Exception {
- expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-
+ public void testInsufficientResources() {
control.replay();
+ IHostAttributes hostA = hostAttributes(HOST_A, host(HOST_A), rack(RACK_A));
assertVetoes(
makeTask(DEFAULT_CPUS + 1, DEFAULT_RAM + 1, DEFAULT_DISK + 1),
- DEFAULT_MODE,
+ hostA,
CPU.veto(1), DISK.veto(1), RAM.veto(1));
- assertVetoes(makeTask(DEFAULT_CPUS + 1, DEFAULT_RAM, DEFAULT_DISK), DEFAULT_MODE, CPU.veto(1));
- assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM + 1, DEFAULT_DISK), DEFAULT_MODE, RAM.veto(1));
- assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK + 1), DEFAULT_MODE, DISK.veto(1));
+ assertVetoes(makeTask(DEFAULT_CPUS + 1, DEFAULT_RAM, DEFAULT_DISK), hostA, CPU.veto(1));
+ assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM + 1, DEFAULT_DISK), hostA, RAM.veto(1));
+ assertVetoes(makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK + 1), hostA, DISK.veto(1));
}
@Test
- public void testDedicatedRole() throws Exception {
- expectGetHostAttributes(HOST_A, dedicated(ROLE_A)).anyTimes();
-
+ public void testDedicatedRole() {
control.replay();
- checkConstraint(HOST_A, DEDICATED_ATTRIBUTE, true, ROLE_A);
- assertVetoes(makeTask(OWNER_B, JOB_B), HOST_A, DEDICATED_HOST_VETO);
+ IHostAttributes hostA = hostAttributes(HOST_A, dedicated(ROLE_A));
+ checkConstraint(hostA, DEDICATED_ATTRIBUTE, true, ROLE_A);
+ assertVetoes(makeTask(OWNER_B, JOB_B), hostA, DEDICATED_HOST_VETO);
}
@Test
- public void testSharedDedicatedHost() throws Exception {
- String dedicated1 = "userA/jobA";
- String dedicated2 = "kestrel/kestrel";
-
- expectGetHostAttributes(HOST_A, dedicated(dedicated1, dedicated2)).anyTimes();
-
+ public void testSharedDedicatedHost() {
control.replay();
- assertNoVetoes(checkConstraint(
- new Identity().setRole("userA"),
- "jobA",
- HOST_A,
- DEDICATED_ATTRIBUTE,
- true,
- dedicated1));
- assertNoVetoes(checkConstraint(
- new Identity().setRole("kestrel"),
- "kestrel",
- HOST_A,
- DEDICATED_ATTRIBUTE,
- true,
- dedicated2));
+ String dedicated1 = "userA/jobA";
+ String dedicated2 = "kestrel/kestrel";
+ IHostAttributes hostA = hostAttributes(HOST_A, dedicated(dedicated1, dedicated2));
+ assertNoVetoes(
+ checkConstraint(
+ new Identity().setRole("userA"),
+ "jobA",
+ hostA,
+ DEDICATED_ATTRIBUTE,
+ true,
+ dedicated1),
+ hostA);
+ assertNoVetoes(
+ checkConstraint(
+ new Identity().setRole("kestrel"),
+ "kestrel",
+ hostA,
+ DEDICATED_ATTRIBUTE,
+ true,
+ dedicated2),
+ hostA);
}
@Test
- public void testMultiValuedAttributes() throws Exception {
- expectGetHostAttributes(HOST_A, valueAttribute("jvm", "1.0", "2.0", "3.0")).anyTimes();
- expectGetHostAttributes(HOST_B, valueAttribute("jvm", "1.0")).anyTimes();
-
+ public void testMultiValuedAttributes() {
control.replay();
- checkConstraint(HOST_A, "jvm", true, "1.0");
- checkConstraint(HOST_A, "jvm", false, "4.0");
+ IHostAttributes hostA = hostAttributes(HOST_A, valueAttribute("jvm", "1.0", "2.0", "3.0"));
+ checkConstraint(hostA, "jvm", true, "1.0");
+ checkConstraint(hostA, "jvm", false, "4.0");
- checkConstraint(HOST_A, "jvm", true, "1.0", "2.0");
- checkConstraint(HOST_B, "jvm", false, "2.0", "3.0");
+ checkConstraint(hostA, "jvm", true, "1.0", "2.0");
+ IHostAttributes hostB = hostAttributes(HOST_A, valueAttribute("jvm", "1.0"));
+ checkConstraint(hostB, "jvm", false, "2.0", "3.0");
}
@Test
- public void testHostScheduledForMaintenance() throws Exception {
- expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-
+ public void testHostScheduledForMaintenance() {
control.replay();
- assertNoVetoes(makeTask(), HOST_A, MaintenanceMode.SCHEDULED);
+ assertNoVetoes(
+ makeTask(),
+ hostAttributes(HOST_A, MaintenanceMode.SCHEDULED, host(HOST_A), rack(RACK_A)));
}
@Test
- public void testHostDrainingForMaintenance() throws Exception {
- expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-
+ public void testHostDrainingForMaintenance() {
control.replay();
assertVetoes(makeTask(),
- MaintenanceMode.DRAINING,
+ hostAttributes(HOST_A, MaintenanceMode.DRAINING, host(HOST_A), rack(RACK_A)),
ConstraintFilter.maintenanceVeto("draining"));
}
@Test
- public void testHostDrainedForMaintenance() throws Exception {
- expectGetHostAttributes(HOST_A, host(HOST_A), rack(RACK_A)).atLeastOnce();
-
+ public void testHostDrainedForMaintenance() {
control.replay();
- assertVetoes(makeTask(), MaintenanceMode.DRAINED, ConstraintFilter.maintenanceVeto("drained"));
+ assertVetoes(
+ makeTask(),
+ hostAttributes(HOST_A, MaintenanceMode.DRAINED, host(HOST_A), rack(RACK_A)),
+ ConstraintFilter.maintenanceVeto("drained"));
}
@Test
- public void testMultipleTaskConstraints() throws Exception {
- expectGetHostAttributes(HOST_A, dedicated(HOST_A), host(HOST_A));
- expectGetHostAttributes(HOST_B, dedicated("xxx"), host(HOST_A));
-
+ public void testMultipleTaskConstraints() {
control.replay();
Constraint constraint1 = makeConstraint("host", HOST_A);
@@ -288,40 +282,36 @@ public class SchedulingFilterImplTest extends EasyMockTest {
assertVetoes(
makeTask(OWNER_A, JOB_A, constraint1, constraint2),
- HOST_A,
+ hostAttributes(HOST_A, dedicated(HOST_A), host(HOST_A)),
mismatchVeto(DEDICATED_ATTRIBUTE));
- assertNoVetoes(makeTask(OWNER_B, JOB_B, constraint1, constraint2), HOST_B, DEFAULT_MODE);
+ assertNoVetoes(
+ makeTask(OWNER_B, JOB_B, constraint1, constraint2),
+ hostAttributes(HOST_B, dedicated("xxx"), host(HOST_A)));
}
@Test
- public void testDedicatedMismatchShortCircuits() throws Exception {
+ public void testDedicatedMismatchShortCircuits() {
// Ensures that a dedicated mismatch short-circuits other filter operations, such as
// evaluation of limit constraints. Reduction of task queries is the desired outcome.
- expectGetHostAttributes(HOST_A, host(HOST_A));
- expectGetHostAttributes(HOST_B, dedicated(OWNER_B.getRole() + "/" + JOB_B), host(HOST_B));
-
control.replay();
Constraint hostLimit = limitConstraint("host", 1);
assertVetoes(
makeTask(OWNER_A, JOB_A, hostLimit, makeConstraint(DEDICATED_ATTRIBUTE, "xxx")),
- HOST_A,
+ hostAttributes(HOST_A, host(HOST_A)),
mismatchVeto(DEDICATED_ATTRIBUTE));
assertVetoes(
makeTask(OWNER_B, JOB_A, hostLimit, makeConstraint(DEDICATED_ATTRIBUTE, "xxx")),
- HOST_B,
+ hostAttributes(HOST_B, dedicated(OWNER_B.getRole() + "/" + JOB_B), host(HOST_B)),
mismatchVeto(DEDICATED_ATTRIBUTE));
}
@Test
- public void testUnderLimitNoTasks() throws Exception {
- expectGetHostAttributes(HOST_A);
- expectGetHostAttributes(HOST_A, host(HOST_A));
-
+ public void testUnderLimitNoTasks() {
control.replay();
- assertNoVetoes(hostLimitTask(2), HOST_A, DEFAULT_MODE);
+ assertNoVetoes(hostLimitTask(2), hostAttributes(HOST_A, host(HOST_A)));
}
private Attribute host(String host) {
@@ -356,90 +346,89 @@ public class SchedulingFilterImplTest extends EasyMockTest {
control.replay();
- assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 2), HOST_A, stateA);
+ IHostAttributes hostA = hostAttributes(HOST_A, host(HOST_A), rack(RACK_A));
+ IHostAttributes hostB = hostAttributes(HOST_B, host(HOST_B), rack(RACK_A));
+ IHostAttributes hostC = hostAttributes(HOST_C, host(HOST_C), rack(RACK_B));
+ assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 2), hostA, stateA);
assertVetoes(
hostLimitTask(OWNER_A, JOB_A, 1),
- HOST_B,
+ hostB,
stateA,
- DEFAULT_MODE,
limitVeto(HOST_ATTRIBUTE));
assertVetoes(
hostLimitTask(OWNER_A, JOB_A, 2),
- HOST_B,
+ hostB,
stateA,
- DEFAULT_MODE,
limitVeto(HOST_ATTRIBUTE));
- assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 3), HOST_B, stateA);
+ assertNoVetoes(hostLimitTask(OWNER_A, JOB_A, 3), hostB, stateA);
assertVetoes(
rackLimitTask(OWNER_B, JOB_A, 2),
- HOST_B,
+ hostB,
stateB,
- DEFAULT_MODE,
limitVeto(RACK_ATTRIBUTE));
assertVetoes(
rackLimitTask(OWNER_B, JOB_A, 3),
- HOST_B,
+ hostB,
stateB,
- DEFAULT_MODE,
limitVeto(RACK_ATTRIBUTE));
- assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 4), HOST_B, stateB);
+ assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 4), hostB, stateB);
- assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 1), HOST_C, stateB);
+ assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 1), hostC, stateB);
assertVetoes(
rackLimitTask(OWNER_A, JOB_A, 1),
- HOST_C,
+ hostC,
stateA,
- DEFAULT_MODE,
limitVeto(RACK_ATTRIBUTE));
- assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 2), HOST_C, stateB);
+ assertNoVetoes(rackLimitTask(OWNER_B, JOB_A, 2), hostC, stateB);
}
@Test
- public void testAttribute() throws Exception {
- expectGetHostAttributes(HOST_A, valueAttribute("jvm", "1.0")).atLeastOnce();
-
+ public void testAttribute() {
control.replay();
+ IHostAttributes hostA = hostAttributes(HOST_A, valueAttribute("jvm", "1.0"));
+
// Matches attribute, matching value.
- checkConstraint(HOST_A, "jvm", true, "1.0");
+ checkConstraint(hostA, "jvm", true, "1.0");
// Matches attribute, different value.
- checkConstraint(HOST_A, "jvm", false, "1.4");
+ checkConstraint(hostA, "jvm", false, "1.4");
// Does not match attribute.
- checkConstraint(HOST_A, "xxx", false, "1.4");
+ checkConstraint(hostA, "xxx", false, "1.4");
// Logical 'OR' matching attribute.
- checkConstraint(HOST_A, "jvm", false, "1.2", "1.4");
+ checkConstraint(hostA, "jvm", false, "1.2", "1.4");
// Logical 'OR' not matching attribute.
- checkConstraint(HOST_A, "xxx", false, "1.0", "1.4");
+ checkConstraint(hostA, "xxx", false, "1.0", "1.4");
}
@Test
- public void testAttributes() throws Exception {
- expectGetHostAttributes(HOST_A,
- valueAttribute("jvm", "1.4", "1.6", "1.7"),
- valueAttribute("zone", "a", "b", "c")).atLeastOnce();
-
+ public void testAttributes() {
control.replay();
+ IHostAttributes hostA = hostAttributes(
+ HOST_A,
+ valueAttribute("jvm", "1.4", "1.6", "1.7"),
+ valueAttribute("zone", "a", "b", "c"));
+
// Matches attribute, matching value.
- checkConstraint(HOST_A, "jvm", true, "1.4");
+ checkConstraint(hostA, "jvm", true, "1.4");
// Matches attribute, different value.
- checkConstraint(HOST_A, "jvm", false, "1.0");
+ checkConstraint(hostA, "jvm", false, "1.0");
// Does not match attribute.
- checkConstraint(HOST_A, "xxx", false, "1.4");
+ checkConstraint(hostA, "xxx", false, "1.4");
// Logical 'OR' with attribute and value match.
- checkConstraint(HOST_A, "jvm", true, "1.2", "1.4");
+ checkConstraint(hostA, "jvm", true, "1.2", "1.4");
// Does not match attribute.
- checkConstraint(HOST_A, "xxx", false, "1.0", "1.4");
+ checkConstraint(hostA, "xxx", false, "1.0", "1.4");
// Check that logical AND works.
Constraint jvmConstraint = makeConstraint("jvm", "1.6");
@@ -447,12 +436,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
ITaskConfig task = makeTask(OWNER_A, JOB_A, jvmConstraint, zoneConstraint);
assertTrue(
- defaultFilter.filter(
- DEFAULT_OFFER,
- HOST_A,
- DEFAULT_MODE,
- task, TASK_ID,
- emptyJob).isEmpty());
+ defaultFilter.filter(DEFAULT_OFFER, hostA, task, TASK_ID, emptyJob).isEmpty());
Constraint jvmNegated = jvmConstraint.deepCopy();
jvmNegated.getConstraint().getValue().setNegated(true);
@@ -460,7 +444,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
zoneNegated.getConstraint().getValue().setNegated(true);
assertVetoes(
makeTask(OWNER_A, JOB_A, jvmNegated, zoneNegated),
- HOST_A,
+ hostA,
mismatchVeto("jvm"));
}
@@ -475,22 +459,22 @@ public class SchedulingFilterImplTest extends EasyMockTest {
}
@Test
- public void testDuplicatedAttribute() throws Exception {
- expectGetHostAttributes(HOST_A,
- valueAttribute("jvm", "1.4"),
- valueAttribute("jvm", "1.6", "1.7")).atLeastOnce();
-
+ public void testDuplicatedAttribute() {
control.replay();
+ IHostAttributes hostA = hostAttributes(HOST_A,
+ valueAttribute("jvm", "1.4"),
+ valueAttribute("jvm", "1.6", "1.7"));
+
// Matches attribute, matching value.
- checkConstraint(HOST_A, "jvm", true, "1.4");
- checkConstraint(HOST_A, "jvm", true, "1.6");
- checkConstraint(HOST_A, "jvm", true, "1.7");
- checkConstraint(HOST_A, "jvm", true, "1.6", "1.7");
+ checkConstraint(hostA, "jvm", true, "1.4");
+ checkConstraint(hostA, "jvm", true, "1.6");
+ checkConstraint(hostA, "jvm", true, "1.7");
+ checkConstraint(hostA, "jvm", true, "1.6", "1.7");
}
private ITaskConfig checkConstraint(
- String host,
+ IHostAttributes hostAttributes,
String constraintName,
boolean expected,
String value,
@@ -499,7 +483,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
return checkConstraint(
OWNER_A,
JOB_A,
- host,
+ hostAttributes,
constraintName,
expected,
value,
@@ -509,13 +493,19 @@ public class SchedulingFilterImplTest extends EasyMockTest {
private ITaskConfig checkConstraint(
Identity owner,
String jobName,
- String host,
+ IHostAttributes hostAttributes,
String constraintName,
boolean expected,
String value,
String... vs) {
- return checkConstraint(owner, jobName, emptyJob, host, constraintName, expected,
+ return checkConstraint(
+ owner,
+ jobName,
+ emptyJob,
+ hostAttributes,
+ constraintName,
+ expected,
new ValueConstraint(false,
ImmutableSet.<String>builder().add(value).addAll(Arrays.asList(vs)).build()));
}
@@ -524,7 +514,7 @@ public class SchedulingFilterImplTest extends EasyMockTest {
Identity owner,
String jobName,
AttributeAggregate aggregate,
- String host,
+ IHostAttributes hostAttributes,
String constraintName,
boolean expected,
ValueConstraint value) {
@@ -534,9 +524,8 @@ public class SchedulingFilterImplTest extends EasyMockTest {
assertEquals(
expected,
defaultFilter.filter(
- DEFAULT_OFFER,
- host,
- DEFAULT_MODE,
+ DEFAULT_OFFER,
+ hostAttributes,
task,
TASK_ID,
aggregate).isEmpty());
@@ -548,63 +537,57 @@ public class SchedulingFilterImplTest extends EasyMockTest {
!expected,
defaultFilter.filter(
DEFAULT_OFFER,
- host,
- DEFAULT_MODE,
+ hostAttributes,
negatedTask,
TASK_ID,
aggregate).isEmpty());
return task;
}
- private void assertNoVetoes(ITaskConfig task) {
- assertNoVetoes(task, emptyJob);
- }
-
- private void assertNoVetoes(ITaskConfig task, AttributeAggregate jobState) {
- assertNoVetoes(task, HOST_A, jobState);
- }
-
- private void assertNoVetoes(ITaskConfig task, String host, MaintenanceMode mode) {
- assertVetoes(task, host, emptyJob, mode);
+ private void assertNoVetoes(ITaskConfig task, IHostAttributes hostAttributes) {
+ assertVetoes(task, hostAttributes, emptyJob);
}
private void assertNoVetoes(
ITaskConfig task,
- String host,
+ IHostAttributes attributes,
AttributeAggregate jobState) {
- assertVetoes(task, host, jobState, DEFAULT_MODE);
+
+ assertVetoes(task, attributes, jobState);
}
- private void assertVetoes(ITaskConfig task, MaintenanceMode mode, Veto... vetos) {
- assertVetoes(task, emptyJob, mode, vetos);
+ private void assertVetoes(ITaskConfig task, IHostAttributes hostAttributes, Veto... vetoes) {
+ assertVetoes(task, hostAttributes, emptyJob, vetoes);
}
private void assertVetoes(
ITaskConfig task,
+ IHostAttributes hostAttributes,
AttributeAggregate jobState,
- MaintenanceMode mode,
- Veto... vetos) {
- assertVetoes(task, HOST_A, jobState, mode, vetos);
+ Veto... vetoes) {
+
+ assertEquals(
+ ImmutableSet.copyOf(vetoes),
+ defaultFilter.filter(DEFAULT_OFFER, hostAttributes, task, TASK_ID, jobState));
}
- private void assertVetoes(
- ITaskConfig task,
+ private static IHostAttributes hostAttributes(
String host,
- Veto... vetoes) {
+ MaintenanceMode mode,
+ Attribute... attributes) {
- assertVetoes(task, host, emptyJob, DEFAULT_MODE, vetoes);
+ return IHostAttributes.build(
+ new HostAttributes()
+ .setHost(host)
+ .setMode(mode)
+ .setAttributes(ImmutableSet.<Attribute>builder().add(attributes).build()));
}
- private void assertVetoes(
- ITaskConfig task,
+ private static IHostAttributes hostAttributes(
String host,
- AttributeAggregate jobState,
- MaintenanceMode mode,
- Veto... vetoes) {
+ Attribute... attributes) {
- assertEquals(
- ImmutableSet.copyOf(vetoes),
- defaultFilter.filter(DEFAULT_OFFER, host, mode, task, TASK_ID, jobState));
+ return hostAttributes(host, MaintenanceMode.NONE, attributes);
}
private Attribute valueAttribute(String name, String string, String... strings) {
@@ -676,11 +659,11 @@ public class SchedulingFilterImplTest extends EasyMockTest {
.setExecutorConfig(new ExecutorConfig("aurora", "config"))));
}
- private ITaskConfig makeTask(int cpus, long ramMb, long diskMb) throws Exception {
+ private ITaskConfig makeTask(int cpus, long ramMb, long diskMb) {
return makeTask(OWNER_A, JOB_A, cpus, ramMb, diskMb);
}
- private ITaskConfig makeTask() throws Exception {
+ private ITaskConfig makeTask() {
return makeTask(DEFAULT_CPUS, DEFAULT_RAM, DEFAULT_DISK);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
index 052562f..d02c6b3 100644
--- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.common.testing.TearDown;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.AbstractModule;
@@ -32,8 +33,10 @@ import com.twitter.common.application.Lifecycle;
import com.twitter.common.base.Command;
import com.twitter.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.Attribute;
import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.TaskLauncher;
import org.apache.aurora.scheduler.base.Conversions;
import org.apache.aurora.scheduler.base.SchedulerException;
@@ -47,7 +50,6 @@ import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.apache.mesos.Protos.ExecutorID;
import org.apache.mesos.Protos.FrameworkID;
import org.apache.mesos.Protos.MasterInfo;
-import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.OfferID;
import org.apache.mesos.Protos.SlaveID;
import org.apache.mesos.Protos.TaskID;
@@ -58,6 +60,8 @@ import org.junit.Before;
import org.junit.Test;
import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
+import static org.apache.aurora.gen.MaintenanceMode.NONE;
+import static org.apache.mesos.Protos.Offer;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertTrue;
@@ -75,18 +79,32 @@ public class MesosSchedulerImplTest extends EasyMockTest {
ExecutorID.newBuilder().setValue("executor-id").build();
private static final OfferID OFFER_ID = OfferID.newBuilder().setValue("offer-id").build();
- private static final Offer OFFER = Offer.newBuilder()
- .setFrameworkId(FRAMEWORK)
- .setSlaveId(SLAVE_ID)
- .setHostname(SLAVE_HOST)
- .setId(OFFER_ID)
- .build();
+ private static final HostOffer OFFER = new HostOffer(
+ Offer.newBuilder()
+ .setFrameworkId(FRAMEWORK)
+ .setSlaveId(SLAVE_ID)
+ .setHostname(SLAVE_HOST)
+ .setId(OFFER_ID)
+ .build(),
+ IHostAttributes.build(
+ new HostAttributes()
+ .setHost(SLAVE_HOST)
+ .setSlaveId(SLAVE_ID.getValue())
+ .setMode(NONE)
+ .setAttributes(ImmutableSet.<Attribute>of())));
private static final OfferID OFFER_ID_2 = OfferID.newBuilder().setValue("offer-id-2").build();
- private static final Offer OFFER_2 = Offer.newBuilder(OFFER)
- .setSlaveId(SLAVE_ID_2)
- .setHostname(SLAVE_HOST_2)
- .setId(OFFER_ID_2)
- .build();
+ private static final HostOffer OFFER_2 = new HostOffer(
+ Offer.newBuilder(OFFER.getOffer())
+ .setSlaveId(SLAVE_ID_2)
+ .setHostname(SLAVE_HOST_2)
+ .setId(OFFER_ID_2)
+ .build(),
+ IHostAttributes.build(
+ new HostAttributes()
+ .setHost(SLAVE_HOST_2)
+ .setSlaveId(SLAVE_ID_2.getValue())
+ .setMode(NONE)
+ .setAttributes(ImmutableSet.<Attribute>of())));
private static final TaskStatus STATUS = TaskStatus.newBuilder()
.setState(TaskState.TASK_RUNNING)
@@ -184,15 +202,17 @@ public class MesosSchedulerImplTest extends EasyMockTest {
new OfferFixture() {
@Override
void respondToOffer() throws Exception {
- IHostAttributes draining = IHostAttributes.build(new HostAttributes().setMode(DRAINING));
- expect(storageUtil.attributeStore.getHostAttributes(OFFER.getHostname()))
+ IHostAttributes draining =
+ IHostAttributes.build(OFFER.getAttributes().newBuilder().setMode(DRAINING));
+ expect(storageUtil.attributeStore.getHostAttributes(OFFER.getOffer().getHostname()))
.andReturn(Optional.of(draining));
IHostAttributes saved = IHostAttributes.build(
- Conversions.getAttributes(OFFER).newBuilder().setMode(DRAINING));
+ Conversions.getAttributes(OFFER.getOffer()).newBuilder().setMode(DRAINING));
expect(storageUtil.attributeStore.saveHostAttributes(saved)).andReturn(true);
- expect(systemLauncher.willUse(OFFER)).andReturn(false);
- expect(userLauncher.willUse(OFFER)).andReturn(true);
+ HostOffer offer = new HostOffer(OFFER.getOffer(), draining);
+ expect(systemLauncher.willUse(offer)).andReturn(false);
+ expect(userLauncher.willUse(offer)).andReturn(true);
}
}.run();
}
@@ -255,7 +275,7 @@ public class MesosSchedulerImplTest extends EasyMockTest {
@Override
void test() {
- scheduler.resourceOffers(driver, ImmutableList.of(OFFER, OFFER_2));
+ scheduler.resourceOffers(driver, ImmutableList.of(OFFER.getOffer(), OFFER_2.getOffer()));
}
}.run();
}
@@ -286,11 +306,11 @@ public class MesosSchedulerImplTest extends EasyMockTest {
"hello".getBytes(StandardCharsets.UTF_8));
}
- private void expectOfferAttributesSaved(Offer offer) {
- expect(storageUtil.attributeStore.getHostAttributes(offer.getHostname()))
+ private void expectOfferAttributesSaved(HostOffer offer) {
+ expect(storageUtil.attributeStore.getHostAttributes(offer.getOffer().getHostname()))
.andReturn(Optional.<IHostAttributes>absent());
IHostAttributes defaultMode = IHostAttributes.build(
- Conversions.getAttributes(offer).newBuilder().setMode(MaintenanceMode.NONE));
+ Conversions.getAttributes(offer.getOffer()).newBuilder().setMode(MaintenanceMode.NONE));
expect(storageUtil.attributeStore.saveHostAttributes(defaultMode)).andReturn(true);
}
@@ -341,7 +361,7 @@ public class MesosSchedulerImplTest extends EasyMockTest {
@Override
void test() {
- scheduler.resourceOffers(driver, ImmutableList.of(OFFER));
+ scheduler.resourceOffers(driver, ImmutableList.of(OFFER.getOffer()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
index 563c1be..8d69ae9 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/TaskAssignerImplTest.java
@@ -20,9 +20,10 @@ import com.twitter.common.testing.easymock.EasyMockTest;
import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.ExecutorConfig;
-import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
@@ -31,9 +32,9 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
import org.apache.aurora.scheduler.state.TaskAssigner.TaskAssignerImpl;
import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.OfferID;
import org.apache.mesos.Protos.Resource;
import org.apache.mesos.Protos.SlaveID;
@@ -45,14 +46,14 @@ import org.apache.mesos.Protos.Value.Type;
import org.junit.Before;
import org.junit.Test;
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
+import static org.apache.mesos.Protos.Offer;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
public class TaskAssignerImplTest extends EasyMockTest {
private static final int PORT = 5000;
- private static final Offer OFFER = Offer.newBuilder()
+ private static final Offer MESOS_OFFER = Offer.newBuilder()
.setId(OfferID.newBuilder().setValue("offerId"))
.setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
.setSlaveId(SlaveID.newBuilder().setValue("slaveId"))
@@ -63,7 +64,8 @@ public class TaskAssignerImplTest extends EasyMockTest {
.setRanges(
Ranges.newBuilder().addRange(Range.newBuilder().setBegin(PORT).setEnd(PORT))))
.build();
- private static final HostOffer HOST_OFFER = new HostOffer(OFFER, MaintenanceMode.NONE);
+ private static final HostOffer OFFER =
+ new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes()));
private static final IScheduledTask TASK = IScheduledTask.build(
new ScheduledTask()
.setAssignedTask(new AssignedTask()
@@ -74,7 +76,7 @@ public class TaskAssignerImplTest extends EasyMockTest {
private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
.setName("taskName")
.setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK)))
- .setSlaveId(OFFER.getSlaveId())
+ .setSlaveId(MESOS_OFFER.getSlaveId())
.build();
private StateManager stateManager;
@@ -97,32 +99,31 @@ public class TaskAssignerImplTest extends EasyMockTest {
@Test
public void testAssignNoVetoes() {
expect(filter.filter(
- ResourceSlot.from(OFFER),
- OFFER.getHostname(),
- MaintenanceMode.NONE,
+ ResourceSlot.from(MESOS_OFFER),
+ OFFER.getAttributes(),
TASK.getAssignedTask().getTask(),
Tasks.id(TASK),
emptyJob))
.andReturn(ImmutableSet.<Veto>of());
expect(stateManager.assignTask(
Tasks.id(TASK),
- OFFER.getHostname(),
- OFFER.getSlaveId(),
+ MESOS_OFFER.getHostname(),
+ MESOS_OFFER.getSlaveId(),
ImmutableSet.of(PORT)))
.andReturn(TASK.getAssignedTask());
- expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER.getSlaveId())).andReturn(TASK_INFO);
+ expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER.getSlaveId()))
+ .andReturn(TASK_INFO);
control.replay();
- assertEquals(Optional.of(TASK_INFO), assigner.maybeAssign(HOST_OFFER, TASK, emptyJob));
+ assertEquals(Optional.of(TASK_INFO), assigner.maybeAssign(OFFER, TASK, emptyJob));
}
@Test
public void testAssignVetoes() {
expect(filter.filter(
- ResourceSlot.from(OFFER),
- OFFER.getHostname(),
- MaintenanceMode.NONE,
+ ResourceSlot.from(MESOS_OFFER),
+ OFFER.getAttributes(),
TASK.getAssignedTask().getTask(),
Tasks.id(TASK),
emptyJob))
@@ -130,6 +131,6 @@ public class TaskAssignerImplTest extends EasyMockTest {
control.replay();
- assertEquals(Optional.<TaskInfo>absent(), assigner.maybeAssign(HOST_OFFER, TASK, emptyJob));
+ assertEquals(Optional.<TaskInfo>absent(), assigner.maybeAssign(OFFER, TASK, emptyJob));
}
}
[2/2] incubator-aurora git commit: Store host attributes alongside
offers to reduce number of lookups.
Posted by wf...@apache.org.
Store host attributes alongside offers to reduce number of lookups.
Bugs closed: AURORA-913
Reviewed at https://reviews.apache.org/r/27902/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/b80e69c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/b80e69c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/b80e69c9
Branch: refs/heads/master
Commit: b80e69c9b9d0e18b9ea1189fd310e6995e4e1fe9
Parents: 2e2c4b3
Author: Bill Farner <wf...@apache.org>
Authored: Wed Nov 12 16:10:03 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Nov 12 16:10:03 2014 -0800
----------------------------------------------------------------------
.../org/apache/aurora/scheduler/HostOffer.java | 66 ++++
.../apache/aurora/scheduler/TaskLauncher.java | 3 +-
.../aurora/scheduler/UserTaskLauncher.java | 3 +-
.../scheduler/async/GcExecutorLauncher.java | 18 +-
.../aurora/scheduler/async/OfferQueue.java | 125 ++-----
.../aurora/scheduler/async/Preemptor.java | 67 ++--
.../async/RandomJitterReturnDelay.java | 4 +-
.../aurora/scheduler/async/TaskScheduler.java | 10 +-
.../events/NotifyingSchedulingFilter.java | 7 +-
.../scheduler/filter/SchedulingFilter.java | 7 +-
.../scheduler/filter/SchedulingFilterImpl.java | 86 ++---
.../apache/aurora/scheduler/http/Offers.java | 4 +-
.../scheduler/mesos/MesosSchedulerImpl.java | 44 ++-
.../scheduler/state/MaintenanceController.java | 1 +
.../aurora/scheduler/state/TaskAssigner.java | 19 +-
.../scheduler/stats/AsyncStatsModule.java | 9 +-
.../aurora/scheduler/UserTaskLauncherTest.java | 13 +-
.../scheduler/async/GcExecutorLauncherTest.java | 24 +-
.../scheduler/async/OfferQueueImplTest.java | 136 +++-----
.../scheduler/async/PreemptorImplTest.java | 53 +--
.../scheduler/async/TaskSchedulerImplTest.java | 9 +-
.../scheduler/async/TaskSchedulerTest.java | 107 +++---
.../events/NotifyingSchedulingFilterTest.java | 14 +-
.../filter/SchedulingFilterImplTest.java | 335 +++++++++----------
.../scheduler/mesos/MesosSchedulerImplTest.java | 64 ++--
.../scheduler/state/TaskAssignerImplTest.java | 35 +-
26 files changed, 625 insertions(+), 638 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/HostOffer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/HostOffer.java b/src/main/java/org/apache/aurora/scheduler/HostOffer.java
new file mode 100644
index 0000000..5056b60
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/HostOffer.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import java.util.Objects;
+
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.mesos.Protos.Offer;
+
+/**
+ * An available resource in the cluster.
+ */
+public class HostOffer {
+ private final Offer offer;
+ private final IHostAttributes hostAttributes;
+
+ public HostOffer(Offer offer, IHostAttributes hostAttributes) {
+ this.offer = requireNonNull(offer);
+ this.hostAttributes = requireNonNull(hostAttributes);
+ }
+
+ public Offer getOffer() {
+ return offer;
+ }
+
+ public IHostAttributes getAttributes() {
+ return hostAttributes;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof HostOffer)) {
+ return false;
+ }
+ HostOffer other = (HostOffer) o;
+ return Objects.equals(offer, other.offer)
+ && Objects.equals(hostAttributes, other.hostAttributes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(offer, hostAttributes);
+ }
+
+ @Override
+ public String toString() {
+ return com.google.common.base.Objects.toStringHelper(this)
+ .add("offer", offer)
+ .add("hostAttributes", hostAttributes)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
index c13520a..cd55a6e 100644
--- a/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/TaskLauncher.java
@@ -13,7 +13,6 @@
*/
package org.apache.aurora.scheduler;
-import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.OfferID;
import org.apache.mesos.Protos.TaskStatus;
@@ -33,7 +32,7 @@ public interface TaskLauncher {
* @return {@code false} if the launcher will not act on the offer, or {@code true} if the
* launcher may accept the offer at some point in the future.
*/
- boolean willUse(Offer offer);
+ boolean willUse(HostOffer offer);
/**
* Informs the launcher that a status update has been received for a task. If the task is not
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
index 250c2df..e1b7d05 100644
--- a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java
@@ -27,7 +27,6 @@ import org.apache.aurora.scheduler.async.OfferQueue;
import org.apache.aurora.scheduler.base.Conversions;
import org.apache.aurora.scheduler.base.SchedulerException;
import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.OfferID;
import org.apache.mesos.Protos.TaskStatus;
@@ -56,7 +55,7 @@ class UserTaskLauncher implements TaskLauncher {
}
@Override
- public boolean willUse(Offer offer) {
+ public boolean willUse(HostOffer offer) {
requireNonNull(offer);
offerQueue.addOffer(offer);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
index 79d8d8d..e02921d 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java
@@ -41,6 +41,7 @@ import org.apache.aurora.Protobufs;
import org.apache.aurora.codec.ThriftBinaryCodec;
import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
import org.apache.aurora.gen.comm.AdjustRetainedTasks;
+import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.TaskLauncher;
import org.apache.aurora.scheduler.base.CommandUtil;
import org.apache.aurora.scheduler.base.Query;
@@ -52,7 +53,6 @@ import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.ExecutorID;
import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.OfferID;
import org.apache.mesos.Protos.SlaveID;
import org.apache.mesos.Protos.TaskID;
@@ -192,20 +192,22 @@ public class GcExecutorLauncher implements TaskLauncher {
Maps.transformValues(Tasks.mapById(tasksOnHost), Tasks.GET_STATUS)));
}
- private boolean sufficientResources(Offer offer) {
- boolean sufficient = Resources.from(offer).greaterThanOrEqual(TOTAL_GC_EXECUTOR_RESOURCES);
+ private boolean sufficientResources(HostOffer offer) {
+ boolean sufficient =
+ Resources.from(offer.getOffer()).greaterThanOrEqual(TOTAL_GC_EXECUTOR_RESOURCES);
if (!sufficient) {
- LOG.warning("Offer for host " + offer.getHostname() + " is too small for a GC executor");
+ LOG.warning("Offer for host " + offer.getOffer().getHostname()
+ + " is too small for a GC executor");
insufficientOffers.incrementAndGet();
}
return sufficient;
}
@Override
- public boolean willUse(final Offer offer) {
+ public boolean willUse(final HostOffer offer) {
if (!settings.getGcExecutorPath().isPresent()
|| !sufficientResources(offer)
- || !isTimeToCollect(offer.getHostname())) {
+ || !isTimeToCollect(offer.getOffer().getHostname())) {
return false;
}
@@ -213,7 +215,9 @@ public class GcExecutorLauncher implements TaskLauncher {
executor.execute(new Runnable() {
@Override
public void run() {
- driver.launchTask(offer.getId(), makeGcTask(offer.getHostname(), offer.getSlaveId()));
+ driver.launchTask(
+ offer.getOffer().getId(),
+ makeGcTask(offer.getOffer().getHostname(), offer.getOffer().getSlaveId()));
}
});
offersConsumed.incrementAndGet();
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
index dd8a900..d2682cd 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/OfferQueue.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.async;
import java.util.Comparator;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledExecutorService;
@@ -37,11 +36,11 @@ import com.twitter.common.quantity.Time;
import com.twitter.common.stats.Stats;
import org.apache.aurora.gen.MaintenanceMode;
+import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.state.MaintenanceController;
-import org.apache.mesos.Protos.Offer;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.mesos.Protos.OfferID;
import org.apache.mesos.Protos.SlaveID;
import org.apache.mesos.Protos.TaskInfo;
@@ -64,7 +63,7 @@ public interface OfferQueue extends EventSubscriber {
*
* @param offer Newly-available resource offer.
*/
- void addOffer(Offer offer);
+ void addOffer(HostOffer offer);
/**
* Invalidates an offer. This indicates that the scheduler should not attempt to match any
@@ -104,7 +103,7 @@ public interface OfferQueue extends EventSubscriber {
* The delay is calculated for each offer that is received, so the return delay may be
* fixed or variable.
*/
- interface OfferReturnDelay extends Supplier<Amount<Integer, Time>> {
+ interface OfferReturnDelay extends Supplier<Amount<Long, Time>> {
}
/**
@@ -120,51 +119,6 @@ public interface OfferQueue extends EventSubscriber {
}
}
- /**
- * Encapsulate an offer from a host, and the host's maintenance mode.
- */
- class HostOffer {
- private final Offer offer;
-
- // TODO(wfarner): Replace this with HostAttributes for more use of this caching.
- private final MaintenanceMode mode;
-
- public HostOffer(Offer offer, MaintenanceMode mode) {
- this.offer = requireNonNull(offer);
- this.mode = requireNonNull(mode);
- }
-
- public Offer getOffer() {
- return offer;
- }
-
- public MaintenanceMode getMode() {
- return mode;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof HostOffer)) {
- return false;
- }
- HostOffer other = (HostOffer) o;
- return Objects.equals(offer, other.offer) && mode == other.mode;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(offer, mode);
- }
-
- @Override
- public String toString() {
- return com.google.common.base.Objects.toStringHelper(this)
- .add("offer", offer)
- .add("mode", mode)
- .toString();
- }
- }
-
class OfferQueueImpl implements OfferQueue {
private static final Logger LOG = Logger.getLogger(OfferQueueImpl.class.getName());
@@ -174,41 +128,36 @@ public interface OfferQueue extends EventSubscriber {
private final Driver driver;
private final OfferReturnDelay returnDelay;
private final ScheduledExecutorService executor;
- private final MaintenanceController maintenance;
@Inject
- OfferQueueImpl(Driver driver,
- OfferReturnDelay returnDelay,
- ScheduledExecutorService executor,
- MaintenanceController maintenance) {
-
- this.driver = driver;
- this.returnDelay = returnDelay;
- this.executor = executor;
- this.maintenance = maintenance;
+ OfferQueueImpl(Driver driver, OfferReturnDelay returnDelay, ScheduledExecutorService executor) {
+ this.driver = requireNonNull(driver);
+ this.returnDelay = requireNonNull(returnDelay);
+ this.executor = requireNonNull(executor);
}
@Override
- public void addOffer(final Offer offer) {
+ public void addOffer(final HostOffer offer) {
// We run a slight risk of a race here, which is acceptable. The worst case is that we
// temporarily hold two offers for the same host, which should be corrected when we return
// them after the return delay.
// There's also a chance that we return an offer for compaction ~simultaneously with the
// same-host offer being canceled/returned. This is also fine.
- Optional<HostOffer> sameSlave = hostOffers.get(offer.getSlaveId());
+ Optional<HostOffer> sameSlave = hostOffers.get(offer.getOffer().getSlaveId());
if (sameSlave.isPresent()) {
// If there are existing offers for the slave, decline all of them so the master can
// compact all of those offers into a single offer and send them back.
- LOG.info("Returning offers for " + offer.getSlaveId().getValue() + " for compaction.");
- decline(offer.getId());
- removeAndDecline(sameSlave.get().offer.getId());
+ LOG.info("Returning offers for " + offer.getOffer().getSlaveId().getValue()
+ + " for compaction.");
+ decline(offer.getOffer().getId());
+ removeAndDecline(sameSlave.get().getOffer().getId());
} else {
- hostOffers.add(new HostOffer(offer, maintenance.getMode(offer.getHostname())));
+ hostOffers.add(offer);
executor.schedule(
new Runnable() {
@Override
public void run() {
- removeAndDecline(offer.getId());
+ removeAndDecline(offer.getOffer().getId());
}
},
returnDelay.get().as(Time.MILLISECONDS),
@@ -252,7 +201,7 @@ public interface OfferQueue extends EventSubscriber {
*/
@Subscribe
public void hostAttributesChanged(HostAttributesChanged change) {
- hostOffers.updateHostMode(change.getAttributes().getHost(), change.getAttributes().getMode());
+ hostOffers.updateHostAttributes(change.getAttributes());
}
/**
@@ -280,12 +229,12 @@ public interface OfferQueue extends EventSubscriber {
.onResultOf(new Function<HostOffer, MaintenanceMode>() {
@Override
public MaintenanceMode apply(HostOffer offer) {
- return offer.mode;
+ return offer.getAttributes().getMode();
}
})
.compound(Ordering.arbitrary());
- private final Set<HostOffer> hostOffers = new ConcurrentSkipListSet<>(PREFERENCE_COMPARATOR);
+ private final Set<HostOffer> offers = new ConcurrentSkipListSet<>(PREFERENCE_COMPARATOR);
private final Map<OfferID, HostOffer> offersById = Maps.newHashMap();
private final Map<SlaveID, HostOffer> offersBySlave = Maps.newHashMap();
private final Map<String, HostOffer> offersByHost = Maps.newHashMap();
@@ -293,7 +242,7 @@ public interface OfferQueue extends EventSubscriber {
HostOffers() {
// Potential gotcha - since this is a ConcurrentSkipListSet, size() is more expensive.
// Could track this separately if it turns out to pose problems.
- Stats.exportSize("outstanding_offers", hostOffers);
+ Stats.exportSize("outstanding_offers", offers);
}
synchronized Optional<HostOffer> get(SlaveID slaveId) {
@@ -301,37 +250,37 @@ public interface OfferQueue extends EventSubscriber {
}
synchronized void add(HostOffer offer) {
- hostOffers.add(offer);
- offersById.put(offer.offer.getId(), offer);
- offersBySlave.put(offer.offer.getSlaveId(), offer);
- offersByHost.put(offer.offer.getHostname(), offer);
+ offers.add(offer);
+ offersById.put(offer.getOffer().getId(), offer);
+ offersBySlave.put(offer.getOffer().getSlaveId(), offer);
+ offersByHost.put(offer.getOffer().getHostname(), offer);
}
synchronized boolean remove(OfferID id) {
HostOffer removed = offersById.remove(id);
if (removed != null) {
- hostOffers.remove(removed);
- offersBySlave.remove(removed.offer.getSlaveId());
- offersByHost.remove(removed.offer.getHostname());
+ offers.remove(removed);
+ offersBySlave.remove(removed.getOffer().getSlaveId());
+ offersByHost.remove(removed.getOffer().getHostname());
}
return removed != null;
}
- synchronized void updateHostMode(String hostName, MaintenanceMode mode) {
- HostOffer offer = offersByHost.remove(hostName);
+ synchronized void updateHostAttributes(IHostAttributes attributes) {
+ HostOffer offer = offersByHost.remove(attributes.getHost());
if (offer != null) {
// Remove and re-add a host's offer to re-sort based on its new hostStatus
- remove(offer.offer.getId());
- add(new HostOffer(offer.offer, mode));
+ remove(offer.getOffer().getId());
+ add(new HostOffer(offer.getOffer(), attributes));
}
}
synchronized Iterable<HostOffer> getWeaklyConsistentOffers() {
- return Iterables.unmodifiableIterable(hostOffers);
+ return Iterables.unmodifiableIterable(offers);
}
synchronized void clear() {
- hostOffers.clear();
+ offers.clear();
offersById.clear();
offersBySlave.clear();
offersByHost.clear();
@@ -345,17 +294,17 @@ public interface OfferQueue extends EventSubscriber {
// It's important that this method is not called concurrently - doing so would open up the
// possibility of a race between the same offers being accepted by different threads.
- for (HostOffer hostOffer : hostOffers.getWeaklyConsistentOffers()) {
- Optional<TaskInfo> assignment = acceptor.apply(hostOffer);
+ for (HostOffer offer : hostOffers.getWeaklyConsistentOffers()) {
+ Optional<TaskInfo> assignment = acceptor.apply(offer);
if (assignment.isPresent()) {
// Guard against an offer being removed after we grabbed it from the iterator.
// If that happens, the offer will not exist in hostOffers, and we can immediately
// send it back to LOST for quick reschedule.
// Removing while iterating counts on the use of a weakly-consistent iterator being used,
// which is a feature of ConcurrentSkipListSet.
- if (hostOffers.remove(hostOffer.offer.getId())) {
+ if (hostOffers.remove(offer.getOffer().getId())) {
try {
- driver.launchTask(hostOffer.offer.getId(), assignment.get());
+ driver.launchTask(offer.getOffer().getId(), assignment.get());
return true;
} catch (IllegalStateException e) {
// TODO(William Farner): Catch only the checked exception produced by Driver
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
index a17738e..1d337f6 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/Preemptor.java
@@ -40,19 +40,20 @@ import com.google.common.collect.Sets;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.stats.Stats;
+import com.twitter.common.stats.StatsProvider;
import com.twitter.common.util.Clock;
-import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.state.MaintenanceController;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import static java.lang.annotation.ElementType.FIELD;
@@ -63,8 +64,9 @@ import static java.util.Objects.requireNonNull;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED;
+import static org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+import static org.apache.aurora.scheduler.storage.Storage.Work;
/**
* Preempts active tasks in favor of higher priority tasks.
@@ -130,7 +132,7 @@ public interface Preemptor {
private final SchedulingFilter schedulingFilter;
private final Amount<Long, Time> preemptionCandidacyDelay;
private final Clock clock;
- private final MaintenanceController maintenance;
+ private final AtomicLong missingAttributes;
/**
* Creates a new preemptor.
@@ -151,7 +153,7 @@ public interface Preemptor {
SchedulingFilter schedulingFilter,
@PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay,
Clock clock,
- MaintenanceController maintenance) {
+ StatsProvider statsProvider) {
this.storage = requireNonNull(storage);
this.stateManager = requireNonNull(stateManager);
@@ -159,7 +161,7 @@ public interface Preemptor {
this.schedulingFilter = requireNonNull(schedulingFilter);
this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay);
this.clock = requireNonNull(clock);
- this.maintenance = requireNonNull(maintenance);
+ missingAttributes = statsProvider.makeCounter("preemptor_missing_attributes");
}
private List<IAssignedTask> fetch(Query.Builder query, Predicate<IScheduledTask> filter) {
@@ -200,24 +202,24 @@ public interface Preemptor {
private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT =
new Function<HostOffer, ResourceSlot>() {
@Override
- public ResourceSlot apply(HostOffer hostOffer) {
- return ResourceSlot.from(hostOffer.getOffer());
+ public ResourceSlot apply(HostOffer offer) {
+ return ResourceSlot.from(offer.getOffer());
}
};
private static final Function<HostOffer, String> OFFER_TO_HOST =
new Function<HostOffer, String>() {
@Override
- public String apply(HostOffer hostOffer) {
- return hostOffer.getOffer().getHostname();
+ public String apply(HostOffer offer) {
+ return offer.getOffer().getHostname();
}
};
- private static final Function<HostOffer, MaintenanceMode> OFFER_TO_MODE =
- new Function<HostOffer, MaintenanceMode>() {
+ private static final Function<HostOffer, IHostAttributes> OFFER_TO_ATTRIBUTES =
+ new Function<HostOffer, IHostAttributes>() {
@Override
- public MaintenanceMode apply(HostOffer hostOffer) {
- return hostOffer.getMode();
+ public IHostAttributes apply(HostOffer offer) {
+ return offer.getAttributes();
}
};
@@ -255,18 +257,17 @@ public interface Preemptor {
// us.
return Optional.absent();
}
- MaintenanceMode mode =
- Iterables.getOnlyElement(FluentIterable.from(offers).transform(OFFER_TO_MODE).toSet());
+ IHostAttributes attributes = Iterables.getOnlyElement(
+ FluentIterable.from(offers).transform(OFFER_TO_ATTRIBUTES).toSet());
- Set<SchedulingFilter.Veto> vetos = schedulingFilter.filter(
+ Set<SchedulingFilter.Veto> vetoes = schedulingFilter.filter(
slackResources,
- host,
- mode,
+ attributes,
pendingTask.getTask(),
pendingTask.getTaskId(),
attributeAggregate);
- if (vetos.isEmpty()) {
+ if (vetoes.isEmpty()) {
return Optional.<Set<IAssignedTask>>of(ImmutableSet.<IAssignedTask>of());
}
}
@@ -289,26 +290,40 @@ public interface Preemptor {
ResourceSlot.sum(Iterables.transform(toPreemptTasks, TASK_TO_RESOURCES)),
slackResources);
- Set<SchedulingFilter.Veto> vetos = schedulingFilter.filter(
+ Optional<IHostAttributes> attributes = getHostAttributes(host);
+ if (!attributes.isPresent()) {
+ missingAttributes.incrementAndGet();
+ continue;
+ }
+
+ Set<SchedulingFilter.Veto> vetoes = schedulingFilter.filter(
totalResource,
- host,
- maintenance.getMode(host),
+ attributes.get(),
pendingTask.getTask(),
pendingTask.getTaskId(),
attributeAggregate);
- if (vetos.isEmpty()) {
+ if (vetoes.isEmpty()) {
return Optional.<Set<IAssignedTask>>of(ImmutableSet.copyOf(toPreemptTasks));
}
}
return Optional.absent();
}
+ private Optional<IHostAttributes> getHostAttributes(final String host) {
+ return storage.weaklyConsistentRead(new Work.Quiet<Optional<IHostAttributes>>() {
+ @Override
+ public Optional<IHostAttributes> apply(StoreProvider storeProvider) {
+ return storeProvider.getAttributeStore().getHostAttributes(host);
+ }
+ });
+ }
+
private static final Function<HostOffer, String> OFFER_TO_SLAVE_ID =
new Function<HostOffer, String>() {
@Override
- public String apply(HostOffer hostOffer) {
- return hostOffer.getOffer().getSlaveId().getValue();
+ public String apply(HostOffer offer) {
+ return offer.getOffer().getSlaveId().getValue();
}
};
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java b/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java
index d15d9e6..2accb4e 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/RandomJitterReturnDelay.java
@@ -43,7 +43,7 @@ class RandomJitterReturnDelay implements OfferReturnDelay {
}
@Override
- public Amount<Integer, Time> get() {
- return Amount.of(minHoldTimeMs + random.nextInt(maxJitterWindowMs), Time.MILLISECONDS);
+ public Amount<Long, Time> get() {
+ return Amount.of((long) minHoldTimeMs + random.nextInt(maxJitterWindowMs), Time.MILLISECONDS);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
index b23457e..e2ba8b8 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java
@@ -41,6 +41,7 @@ import com.twitter.common.stats.Stats;
import com.twitter.common.stats.StatsProvider;
import com.twitter.common.util.Clock;
+import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
@@ -65,7 +66,6 @@ import static java.util.Objects.requireNonNull;
import static org.apache.aurora.gen.ScheduleStatus.LOST;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
/**
* Enables scheduling and preemption of tasks.
@@ -134,20 +134,20 @@ public interface TaskScheduler extends EventSubscriber {
return new Function<HostOffer, Optional<TaskInfo>>() {
@Override
- public Optional<TaskInfo> apply(HostOffer hostOffer) {
+ public Optional<TaskInfo> apply(HostOffer offer) {
Optional<String> reservedTaskId =
- reservations.getSlaveReservation(hostOffer.getOffer().getSlaveId());
+ reservations.getSlaveReservation(offer.getOffer().getSlaveId());
if (reservedTaskId.isPresent()) {
if (taskId.equals(reservedTaskId.get())) {
// Slave is reserved to satisfy this task.
- return assigner.maybeAssign(hostOffer, task, attributeAggregate);
+ return assigner.maybeAssign(offer, task, attributeAggregate);
} else {
// Slave is reserved for another task.
return Optional.absent();
}
} else {
// Slave is not reserved.
- return assigner.maybeAssign(hostOffer, task, attributeAggregate);
+ return assigner.maybeAssign(offer, task, attributeAggregate);
}
}
};
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
index fc17cac..ca53303 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/NotifyingSchedulingFilter.java
@@ -20,11 +20,11 @@ import java.util.Set;
import javax.inject.Inject;
import javax.inject.Qualifier;
-import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.events.PubsubEvent.Vetoed;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import static java.lang.annotation.ElementType.FIELD;
@@ -60,13 +60,12 @@ class NotifyingSchedulingFilter implements SchedulingFilter {
@Override
public Set<Veto> filter(
ResourceSlot offer,
- String slaveHost,
- MaintenanceMode mode,
+ IHostAttributes hostAttributes,
ITaskConfig task,
String taskId,
AttributeAggregate jobState) {
- Set<Veto> vetoes = delegate.filter(offer, slaveHost, mode, task, taskId, jobState);
+ Set<Veto> vetoes = delegate.filter(offer, hostAttributes, task, taskId, jobState);
if (!vetoes.isEmpty()) {
eventSink.post(new Vetoed(taskId, vetoes));
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
index c37272c..c1c5f26 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilter.java
@@ -18,8 +18,8 @@ import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.scheduler.ResourceSlot;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
/**
@@ -105,8 +105,6 @@ public interface SchedulingFilter {
* Applies a task against the filter with the given resources, and on the host.
*
* @param offer Resources offered.
- * @param slaveHost Host that the resources are associated with.
- * @param mode Maintenance mode of the host that the resources are associated with.
* @param task Task.
* @param taskId Canonical ID of the task.
* @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
@@ -115,8 +113,7 @@ public interface SchedulingFilter {
*/
Set<Veto> filter(
ResourceSlot offer,
- String slaveHost,
- MaintenanceMode mode,
+ IHostAttributes attributes,
ITaskConfig task,
String taskId,
AttributeAggregate attributeAggregate);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
index 0533baa..cc6b53b 100644
--- a/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/filter/SchedulingFilterImpl.java
@@ -17,8 +17,6 @@ import java.util.Comparator;
import java.util.EnumSet;
import java.util.Set;
-import javax.inject.Inject;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
@@ -33,16 +31,10 @@ import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.TaskConstraint;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.Work.Quiet;
-import org.apache.aurora.scheduler.storage.entities.IAttribute;
import org.apache.aurora.scheduler.storage.entities.IConstraint;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import static java.util.Objects.requireNonNull;
-
import static org.apache.aurora.gen.MaintenanceMode.DRAINED;
import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
import static org.apache.aurora.scheduler.configuration.ConfigurationManager.DEDICATED_ATTRIBUTE;
@@ -65,18 +57,6 @@ public class SchedulingFilterImpl implements SchedulingFilter {
private static final Set<MaintenanceMode> VETO_MODES = EnumSet.of(DRAINING, DRAINED);
- private final Storage storage;
-
- /**
- * Creates a new scheduling filter.
- *
- * @param storage Interface to accessing the task store.
- */
- @Inject
- public SchedulingFilterImpl(Storage storage) {
- this.storage = requireNonNull(storage);
- }
-
/**
* A function that may veto a task.
*/
@@ -187,7 +167,7 @@ public class SchedulingFilterImpl implements SchedulingFilter {
private FilterRule getConstraintFilter(
final AttributeAggregate jobState,
- final String slaveHost) {
+ final IHostAttributes offerAttributes) {
return new FilterRule() {
@Override
@@ -196,32 +176,23 @@ public class SchedulingFilterImpl implements SchedulingFilter {
return ImmutableList.of();
}
- // In the interest of performance, we perform a weakly consistent read here. The biggest
- // risk of this is that we might schedule against stale host attributes, or we might fail
- // to correctly satisfy a diversity constraint. Given that the likelihood is relatively low
- // for both of these, and the impact is also low, the weak consistency is acceptable.
- return storage.weaklyConsistentRead(new Quiet<Iterable<Veto>>() {
- @Override
- public Iterable<Veto> apply(final StoreProvider storeProvider) {
- ConstraintFilter constraintFilter = new ConstraintFilter(
- jobState,
- AttributeStore.Util.attributesOrNone(storeProvider, slaveHost));
- ImmutableList.Builder<Veto> vetoes = ImmutableList.builder();
- for (IConstraint constraint : VALUES_FIRST.sortedCopy(task.getConstraints())) {
- Optional<Veto> veto = constraintFilter.getVeto(constraint);
- if (veto.isPresent()) {
- vetoes.add(veto.get());
- if (isValueConstraint(constraint)) {
- // Break when a value constraint mismatch is found to avoid other
- // potentially-expensive operations to satisfy other constraints.
- break;
- }
- }
+ ConstraintFilter constraintFilter = new ConstraintFilter(
+ jobState,
+ offerAttributes.getAttributes());
+ ImmutableList.Builder<Veto> vetoes = ImmutableList.builder();
+ for (IConstraint constraint : VALUES_FIRST.sortedCopy(task.getConstraints())) {
+ Optional<Veto> veto = constraintFilter.getVeto(constraint);
+ if (veto.isPresent()) {
+ vetoes.add(veto.get());
+ if (isValueConstraint(constraint)) {
+ // Break when a value constraint mismatch is found to avoid other
+ // potentially-expensive operations to satisfy other constraints.
+ break;
}
-
- return vetoes.build();
}
- });
+ }
+
+ return vetoes.build();
}
};
}
@@ -240,38 +211,31 @@ public class SchedulingFilterImpl implements SchedulingFilter {
return builder.build();
}
- private boolean isDedicated(final String slaveHost) {
- Iterable<IAttribute> slaveAttributes =
- storage.weaklyConsistentRead(new Quiet<Iterable<IAttribute>>() {
- @Override
- public Iterable<IAttribute> apply(final StoreProvider storeProvider) {
- return AttributeStore.Util.attributesOrNone(storeProvider, slaveHost);
- }
- });
-
- return Iterables.any(slaveAttributes, new ConstraintFilter.NameFilter(DEDICATED_ATTRIBUTE));
+ private boolean isDedicated(IHostAttributes attributes) {
+ return Iterables.any(
+ attributes.getAttributes(),
+ new ConstraintFilter.NameFilter(DEDICATED_ATTRIBUTE));
}
@Override
public Set<Veto> filter(
ResourceSlot offer,
- String slaveHost,
- MaintenanceMode mode,
+ IHostAttributes attributes,
ITaskConfig task,
String taskId,
AttributeAggregate attributeAggregate) {
- if (!ConfigurationManager.isDedicated(task) && isDedicated(slaveHost)) {
+ if (!ConfigurationManager.isDedicated(task) && isDedicated(attributes)) {
return ImmutableSet.of(DEDICATED_HOST_VETO);
}
- Optional<Veto> maintenanceVeto = getMaintenanceVeto(mode);
+ Optional<Veto> maintenanceVeto = getMaintenanceVeto(attributes.getMode());
if (maintenanceVeto.isPresent()) {
return maintenanceVeto.asSet();
}
return ImmutableSet.<Veto>builder()
- .addAll(getConstraintFilter(attributeAggregate, slaveHost).apply(task))
+ .addAll(getConstraintFilter(attributeAggregate, attributes).apply(task))
.addAll(getResourceVetoes(offer, task))
.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/http/Offers.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/Offers.java b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
index 446dc74..6d75c3a 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/Offers.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/Offers.java
@@ -27,14 +27,14 @@ import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
+import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.async.OfferQueue;
import org.apache.mesos.Protos.Attribute;
import org.apache.mesos.Protos.ExecutorID;
-import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.Resource;
import org.apache.mesos.Protos.Value.Range;
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
+import static org.apache.mesos.Protos.Offer;
/**
* Servlet that exposes resource offers that the scheduler is currently retaining.
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
index ffcbc97..ffc30bb 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java
@@ -30,6 +30,7 @@ import com.twitter.common.inject.TimedInterceptor.Timed;
import com.twitter.common.stats.Stats;
import org.apache.aurora.GuiceUtils.AllowUnchecked;
+import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.TaskLauncher;
import org.apache.aurora.scheduler.base.SchedulerException;
import org.apache.aurora.scheduler.events.EventSink;
@@ -39,10 +40,10 @@ import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.mesos.Protos.ExecutorID;
import org.apache.mesos.Protos.FrameworkID;
import org.apache.mesos.Protos.MasterInfo;
-import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.OfferID;
import org.apache.mesos.Protos.SlaveID;
import org.apache.mesos.Protos.TaskStatus;
@@ -55,6 +56,8 @@ import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static java.util.Objects.requireNonNull;
+import static org.apache.mesos.Protos.Offer;
+
/**
* Location for communication with mesos.
*/
@@ -90,7 +93,7 @@ class MesosSchedulerImpl implements Scheduler {
* @param storage Store to save host attributes into.
* @param lifecycle Application lifecycle manager.
* @param taskLaunchers Task launchers, which will be used in order. Calls to
- * {@link TaskLauncher#willUse(Offer)} and
+ * {@link TaskLauncher#willUse(HostOffer)} and
* {@link TaskLauncher#statusUpdate(TaskStatus)} are propagated to provided
* launchers, ceasing after the first match (based on a return value of
* {@code true}.
@@ -153,37 +156,30 @@ class MesosSchedulerImpl implements Scheduler {
public void resourceOffers(SchedulerDriver driver, final List<Offer> offers) {
Preconditions.checkState(isRegistered, "Must be registered before receiving offers.");
- // Store all host attributes in a single write operation to prevent other threads from
- // securing the storage lock between saves. We also save the host attributes before passing
- // offers elsewhere to ensure that host attributes are available before attempting to
- // schedule tasks associated with offers.
- // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip over
- // offers when the host attributes cannot be found. (AURORA-137)
-
executor.execute(new Runnable() {
@Override
public void run() {
+ // TODO(wfarner): Reconsider the requirements here, augment the task scheduler to skip over
+ // offers when the host attributes cannot be found. (AURORA-137)
storage.write(new MutateWork.NoResult.Quiet() {
@Override
protected void execute(MutableStoreProvider storeProvider) {
- for (final Offer offer : offers) {
- storeProvider.getAttributeStore().saveHostAttributes(
- AttributeStore.Util.mergeOffer(storeProvider.getAttributeStore(), offer));
+ for (Offer offer : offers) {
+ IHostAttributes attributes =
+ AttributeStore.Util.mergeOffer(storeProvider.getAttributeStore(), offer);
+ storeProvider.getAttributeStore().saveHostAttributes(attributes);
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, String.format("Received offer: %s", offer));
+ }
+ totalResourceOffers.incrementAndGet();
+ for (TaskLauncher launcher : taskLaunchers) {
+ if (launcher.willUse(new HostOffer(offer, attributes))) {
+ break;
+ }
+ }
}
}
});
-
- for (Offer offer : offers) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, String.format("Received offer: %s", offer));
- }
- totalResourceOffers.incrementAndGet();
- for (TaskLauncher launcher : taskLaunchers) {
- if (launcher.willUse(offer)) {
- break;
- }
- }
- }
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
index 077699f..86440eb 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -52,6 +52,7 @@ import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
* All state-changing functions return their results. Additionally, all state-changing functions
* will ignore requests to change state of unknown hosts and subsequently omit these hosts from
* return values.
+ * TODO(wfarner): Convert use of HostStatus in this API to IHostStatus (immutable).
*/
public interface MaintenanceController {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
index 9c9b659..77db411 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/TaskAssigner.java
@@ -20,6 +20,7 @@ import javax.inject.Inject;
import com.google.common.base.Optional;
+import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.configuration.Resources;
@@ -29,12 +30,11 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.TaskInfo;
import static java.util.Objects.requireNonNull;
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
+import static org.apache.mesos.Protos.Offer;
/**
* Responsible for matching a task against an offer.
@@ -45,13 +45,13 @@ public interface TaskAssigner {
* Tries to match a task against an offer. If a match is found, the assigner should
* make the appropriate changes to the task and provide a non-empty result.
*
- * @param hostOffer The resource offer.
+ * @param offer The resource offer.
* @param task The task to match against and optionally assign.
* @param attributeAggregate Attribute information for tasks in the job containing {@code task}.
* @return Instructions for launching the task if matching and assignment were successful.
*/
Optional<TaskInfo> maybeAssign(
- HostOffer hostOffer,
+ HostOffer offer,
IScheduledTask task,
AttributeAggregate attributeAggregate);
@@ -89,21 +89,20 @@ public interface TaskAssigner {
@Override
public Optional<TaskInfo> maybeAssign(
- HostOffer hostOffer,
+ HostOffer offer,
IScheduledTask task,
AttributeAggregate attributeAggregate) {
Set<Veto> vetoes = filter.filter(
- ResourceSlot.from(hostOffer.getOffer()),
- hostOffer.getOffer().getHostname(),
- hostOffer.getMode(),
+ ResourceSlot.from(offer.getOffer()),
+ offer.getAttributes(),
task.getAssignedTask().getTask(),
Tasks.id(task),
attributeAggregate);
if (vetoes.isEmpty()) {
- return Optional.of(assign(hostOffer.getOffer(), task));
+ return Optional.of(assign(offer.getOffer(), task));
} else {
- LOG.fine("Slave " + hostOffer.getOffer().getHostname() + " vetoed task " + Tasks.id(task)
+ LOG.fine("Slave " + offer.getOffer().getHostname() + " vetoed task " + Tasks.id(task)
+ ": " + vetoes);
return Optional.absent();
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
index 844a38a..1c9904c 100644
--- a/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/stats/AsyncStatsModule.java
@@ -35,6 +35,7 @@ import com.twitter.common.quantity.Data;
import com.twitter.common.quantity.Time;
import org.apache.aurora.gen.ResourceAggregate;
+import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.async.OfferQueue;
import org.apache.aurora.scheduler.base.AsyncUtil;
import org.apache.aurora.scheduler.base.Conversions;
@@ -49,8 +50,6 @@ import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static java.util.Objects.requireNonNull;
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
-
/**
* Module to configure export of cluster-wide resource allocation and consumption statistics.
*/
@@ -115,13 +114,13 @@ public class AsyncStatsModule extends AbstractModule {
private static final Function<HostOffer, MachineResource> TO_RESOURCE =
new Function<HostOffer, MachineResource>() {
@Override
- public MachineResource apply(HostOffer hostOffer) {
- Resources resources = Resources.from(hostOffer.getOffer());
+ public MachineResource apply(HostOffer offer) {
+ Resources resources = Resources.from(offer.getOffer());
IResourceAggregate quota = IResourceAggregate.build(new ResourceAggregate()
.setNumCpus(resources.getNumCpus())
.setRamMb(resources.getRam().as(Data.MB))
.setDiskMb(resources.getDisk().as(Data.MB)));
- return new MachineResource(quota, Conversions.isDedicated(hostOffer.getOffer()));
+ return new MachineResource(quota, Conversions.isDedicated(offer.getOffer()));
}
};
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
index 083a635..4673e80 100644
--- a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java
@@ -22,14 +22,15 @@ import com.google.common.collect.Iterables;
import com.twitter.common.collections.Pair;
import com.twitter.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.async.OfferQueue;
import org.apache.aurora.scheduler.configuration.Resources;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.mesos.Protos.Attribute;
import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.OfferID;
import org.apache.mesos.Protos.Resource;
import org.apache.mesos.Protos.SlaveID;
@@ -47,6 +48,7 @@ import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.FAILED;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.scheduler.configuration.ConfigurationManager.HOST_CONSTRAINT;
+import static org.apache.mesos.Protos.Offer;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertTrue;
@@ -60,7 +62,7 @@ public class UserTaskLauncherTest extends EasyMockTest {
private static final String TASK_ID_A = "task_id_a";
private static final OfferID OFFER_ID = OfferID.newBuilder().setValue("OfferId").build();
- private static final Offer OFFER = createOffer(SLAVE_ID, SLAVE_HOST_1, 4, 1024, 1024);
+ private static final HostOffer OFFER = createOffer(SLAVE_ID, SLAVE_HOST_1, 4, 1024, 1024);
private OfferQueue offerQueue;
private StateManager stateManager;
@@ -171,13 +173,13 @@ public class UserTaskLauncherTest extends EasyMockTest {
launcher.statusUpdate(status);
}
- private static Offer createOffer(SlaveID slave, String slaveHost, double cpu,
+ private static HostOffer createOffer(SlaveID slave, String slaveHost, double cpu,
double ramMb, double diskMb) {
return createOffer(slave, slaveHost, cpu, ramMb, diskMb,
ImmutableSet.<Pair<Integer, Integer>>of());
}
- private static Offer createOffer(SlaveID slave, String slaveHost, double cpu,
+ private static HostOffer createOffer(SlaveID slave, String slaveHost, double cpu,
double ramMb, double diskMb, Set<Pair<Integer, Integer>> ports) {
Ranges portRanges = Ranges.newBuilder()
@@ -189,7 +191,7 @@ public class UserTaskLauncherTest extends EasyMockTest {
}))
.build();
- return Offer.newBuilder()
+ Offer mesosOffer = Offer.newBuilder()
.addResources(Resource.newBuilder().setType(Type.SCALAR).setName(Resources.CPUS)
.setScalar(Scalar.newBuilder().setValue(cpu)))
.addResources(Resource.newBuilder().setType(Type.SCALAR).setName(Resources.RAM_MB)
@@ -206,5 +208,6 @@ public class UserTaskLauncherTest extends EasyMockTest {
.setFrameworkId(FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build())
.setId(OFFER_ID)
.build();
+ return new HostOffer(mesosOffer, IHostAttributes.build(new HostAttributes()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
index 758a8d4..422d5a9 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java
@@ -30,20 +30,22 @@ import com.twitter.common.util.testing.FakeClock;
import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.ExecutorConfig;
+import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.Identity;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.comm.AdjustRetainedTasks;
+import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.async.GcExecutorLauncher.GcExecutorSettings;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.configuration.Resources;
import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.OfferID;
import org.apache.mesos.Protos.Resource;
import org.apache.mesos.Protos.SlaveID;
@@ -58,6 +60,7 @@ import static org.apache.aurora.gen.ScheduleStatus.FAILED;
import static org.apache.aurora.scheduler.async.GcExecutorLauncher.INSUFFICIENT_OFFERS_STAT_NAME;
import static org.apache.aurora.scheduler.async.GcExecutorLauncher.LOST_TASKS_STAT_NAME;
import static org.apache.aurora.scheduler.async.GcExecutorLauncher.SYSTEM_TASK_PREFIX;
+import static org.apache.mesos.Protos.Offer;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -67,13 +70,15 @@ public class GcExecutorLauncherTest extends EasyMockTest {
private static final String HOST = "slave-host";
- private static final Offer OFFER = Offer.newBuilder()
+ private static final Offer MESOS_OFFER = Offer.newBuilder()
.setSlaveId(SlaveID.newBuilder().setValue("slave-id"))
.setHostname(HOST)
.setFrameworkId(FrameworkID.newBuilder().setValue("framework-id").build())
.setId(OfferID.newBuilder().setValue("offer-id"))
.addAllResources(GcExecutorLauncher.TOTAL_GC_EXECUTOR_RESOURCES.toResourceList())
.build();
+ private static final HostOffer OFFER =
+ new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes()));
private static final String JOB_A = "jobA";
private static final String TASK_UUID = "gc";
@@ -166,18 +171,19 @@ public class GcExecutorLauncherTest extends EasyMockTest {
Resources.subtract(
GcExecutorLauncher.TOTAL_GC_EXECUTOR_RESOURCES,
GcExecutorLauncher.EPSILON).toResourceList();
- Offer smallOffer = OFFER.toBuilder()
+ Offer smallOffer = MESOS_OFFER.toBuilder()
.clearResources()
.addAllResources(resources)
.build();
assertEquals(0, insufficientOffers.get());
- assertFalse(gcExecutorLauncher.willUse(smallOffer));
+ assertFalse(gcExecutorLauncher.willUse(
+ new HostOffer(smallOffer, IHostAttributes.build(new HostAttributes()))));
assertEquals(1, insufficientOffers.get());
}
private static TaskStatus makeStatus(String taskId) {
return TaskStatus.newBuilder()
- .setSlaveId(OFFER.getSlaveId())
+ .setSlaveId(OFFER.getOffer().getSlaveId())
.setState(TaskState.TASK_RUNNING)
.setTaskId(TaskID.newBuilder().setValue(taskId))
.build();
@@ -220,8 +226,12 @@ public class GcExecutorLauncherTest extends EasyMockTest {
Maps.transformValues(Tasks.mapById(ImmutableSet.copyOf(tasks)), Tasks.GET_STATUS);
AdjustRetainedTasks message = new AdjustRetainedTasks().setRetainedTasks(statuses);
TaskInfo task = GcExecutorLauncher.makeGcTask(
- HOST, OFFER.getSlaveId(), SETTINGS.getGcExecutorPath().get(), TASK_UUID, message);
- driver.launchTask(OFFER.getId(), task);
+ HOST,
+ OFFER.getOffer().getSlaveId(),
+ SETTINGS.getGcExecutorPath().get(),
+ TASK_UUID,
+ message);
+ driver.launchTask(OFFER.getOffer().getId(), task);
}
private IScheduledTask makeTask(String jobName, ScheduleStatus status) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
index e2a198a..4cf602a 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/OfferQueueImplTest.java
@@ -13,154 +13,124 @@
*/
package org.apache.aurora.scheduler.async;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
import com.google.common.base.Function;
import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
import com.google.common.testing.TearDown;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
+import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.scheduler.async.OfferQueue.LaunchException;
+import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
import org.apache.aurora.scheduler.mesos.Driver;
-import org.apache.aurora.scheduler.state.MaintenanceController;
-import org.apache.mesos.Protos.Offer;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
import org.apache.mesos.Protos.TaskInfo;
-import org.easymock.IAnswer;
import org.junit.Before;
import org.junit.Test;
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
+import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
+import static org.apache.aurora.gen.MaintenanceMode.NONE;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class OfferQueueImplTest extends EasyMockTest {
- private static final Amount<Integer, Time> RETURN_DELAY = Amount.of(1, Time.DAYS);
+ private static final Amount<Long, Time> RETURN_DELAY = Amount.of(1L, Time.DAYS);
private static final String HOST_A = "HOST_A";
- private static final Offer OFFER_A = Offers.makeOffer("OFFER_A", HOST_A);
+ private static final HostOffer OFFER_A = new HostOffer(
+ Offers.makeOffer("OFFER_A", HOST_A),
+ IHostAttributes.build(new HostAttributes().setMode(NONE)));
private static final String HOST_B = "HOST_B";
- private static final Offer OFFER_B = Offers.makeOffer("OFFER_B", HOST_B);
+ private static final HostOffer OFFER_B = new HostOffer(
+ Offers.makeOffer("OFFER_B", HOST_B),
+ IHostAttributes.build(new HostAttributes().setMode(NONE)));
private static final String HOST_C = "HOST_C";
- private static final Offer OFFER_C = Offers.makeOffer("OFFER_C", HOST_C);
+ private static final HostOffer OFFER_C = new HostOffer(
+ Offers.makeOffer("OFFER_C", HOST_C),
+ IHostAttributes.build(new HostAttributes().setMode(NONE)));
private Driver driver;
- private ScheduledExecutorService executor;
- private ExecutorService testExecutor;
- private MaintenanceController maintenanceController;
+ private FakeScheduledExecutor clock;
private Function<HostOffer, Optional<TaskInfo>> offerAcceptor;
private OfferQueueImpl offerQueue;
@Before
public void setUp() {
driver = createMock(Driver.class);
- ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).build();
- executor = Executors.newSingleThreadScheduledExecutor(threadFactory);
- testExecutor = Executors.newCachedThreadPool(threadFactory);
+ ScheduledExecutorService executorMock = createMock(ScheduledExecutorService.class);
+ clock = FakeScheduledExecutor.scheduleExecutor(executorMock);
+
addTearDown(new TearDown() {
@Override
public void tearDown() throws Exception {
- new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute();
- new ExecutorServiceShutdown(testExecutor, Amount.of(1L, Time.SECONDS)).execute();
+ clock.assertEmpty();
}
});
- maintenanceController = createMock(MaintenanceController.class);
offerAcceptor = createMock(new Clazz<Function<HostOffer, Optional<TaskInfo>>>() { });
OfferReturnDelay returnDelay = new OfferReturnDelay() {
@Override
- public Amount<Integer, Time> get() {
+ public Amount<Long, Time> get() {
return RETURN_DELAY;
}
};
- offerQueue = new OfferQueueImpl(driver, returnDelay, executor, maintenanceController);
+ offerQueue = new OfferQueueImpl(driver, returnDelay, executorMock);
}
@Test
- public void testNoDeadlock() throws Exception {
- // Test that a blocked call to maintenanceController does not result in a deadlock between
- // the intrinsic lock and the storage lock.
- final CountDownLatch launchAttempted = new CountDownLatch(1);
- expect(maintenanceController.getMode(HOST_A)).andAnswer(new IAnswer<MaintenanceMode>() {
- @Override
- public MaintenanceMode answer() throws InterruptedException {
- launchAttempted.await();
- return MaintenanceMode.NONE;
- }
- });
-
- control.replay();
+ public void testOffersSorted() throws Exception {
+ // Ensures that non-DRAINING offers are preferred - the DRAINING offer would be tried last.
- final CountDownLatch offerAdded = new CountDownLatch(1);
- testExecutor.submit(new Runnable() {
- @Override
- public void run() {
- offerQueue.addOffer(OFFER_A);
- offerAdded.countDown();
- }
- });
- testExecutor.submit(new Runnable() {
- @Override
- public void run() {
- try {
- offerQueue.launchFirst(offerAcceptor);
- launchAttempted.countDown();
- } catch (LaunchException e) {
- throw Throwables.propagate(e);
- }
- }
- });
+ HostOffer offerA = setMode(OFFER_A, DRAINING);
+ HostOffer offerC = setMode(OFFER_C, DRAINING);
- launchAttempted.await();
- offerAdded.await();
- }
+ TaskInfo task = TaskInfo.getDefaultInstance();
+ expect(offerAcceptor.apply(OFFER_B)).andReturn(Optional.of(task));
+ driver.launchTask(OFFER_B.getOffer().getId(), task);
- @Test
- public void testOffersSorted() throws Exception {
- MaintenanceMode modeA = MaintenanceMode.NONE;
- MaintenanceMode modeB = MaintenanceMode.DRAINING;
- MaintenanceMode modeC = MaintenanceMode.NONE;
+ driver.declineOffer(offerA.getOffer().getId());
+ driver.declineOffer(offerC.getOffer().getId());
- HostOffer hostOfferA = new HostOffer(OFFER_A, modeA);
- HostOffer hostOfferB = new HostOffer(OFFER_B, modeB);
- HostOffer hostOfferC = new HostOffer(OFFER_C, modeC);
+ control.replay();
- expect(maintenanceController.getMode(HOST_A)).andReturn(modeA);
- expect(maintenanceController.getMode(HOST_B)).andReturn(modeB);
- expect(maintenanceController.getMode(HOST_C)).andReturn(modeC);
- expect(offerAcceptor.apply(hostOfferA)).andReturn(Optional.<TaskInfo>absent());
- expect(offerAcceptor.apply(hostOfferB)).andReturn(Optional.<TaskInfo>absent());
- expect(offerAcceptor.apply(hostOfferC)).andReturn(Optional.<TaskInfo>absent());
+ offerQueue.addOffer(offerA);
+ offerQueue.addOffer(OFFER_B);
+ offerQueue.addOffer(offerC);
+ assertTrue(offerQueue.launchFirst(offerAcceptor));
+ clock.advance(RETURN_DELAY);
+ }
+ @Test
+ public void testFlushOffers() throws Exception {
control.replay();
offerQueue.addOffer(OFFER_A);
offerQueue.addOffer(OFFER_B);
- offerQueue.addOffer(OFFER_C);
+ offerQueue.driverDisconnected(new DriverDisconnected());
assertFalse(offerQueue.launchFirst(offerAcceptor));
+ clock.advance(RETURN_DELAY);
}
@Test
- public void testFlushOffers() throws Exception {
- expect(maintenanceController.getMode(HOST_A)).andReturn(MaintenanceMode.NONE);
- expect(maintenanceController.getMode(HOST_B)).andReturn(MaintenanceMode.NONE);
+ public void testDeclineOffer() throws Exception {
+ driver.declineOffer(OFFER_A.getOffer().getId());
control.replay();
offerQueue.addOffer(OFFER_A);
- offerQueue.addOffer(OFFER_B);
- offerQueue.driverDisconnected(new DriverDisconnected());
- assertFalse(offerQueue.launchFirst(offerAcceptor));
+ clock.advance(RETURN_DELAY);
+ }
+
+ private static HostOffer setMode(HostOffer offer, MaintenanceMode mode) {
+ return new HostOffer(
+ offer.getOffer(),
+ IHostAttributes.build(offer.getAttributes().newBuilder().setMode(mode)));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
index c0fa462..59bfbcb 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/PreemptorImplTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Lists;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Data;
import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
import com.twitter.common.testing.easymock.EasyMockTest;
import com.twitter.common.util.testing.FakeClock;
@@ -42,6 +43,7 @@ import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.ResourceSlot;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
@@ -49,7 +51,6 @@ import org.apache.aurora.scheduler.configuration.Resources;
import org.apache.aurora.scheduler.filter.AttributeAggregate;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
-import org.apache.aurora.scheduler.state.MaintenanceController;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.Storage;
@@ -60,6 +61,7 @@ import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.storage.mem.MemStorage;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.IExpectationSetters;
@@ -70,7 +72,6 @@ import static org.apache.aurora.gen.MaintenanceMode.NONE;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
import static org.apache.aurora.scheduler.async.Preemptor.PreemptorImpl;
import static org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
import static org.apache.mesos.Protos.Offer;
@@ -103,7 +104,7 @@ public class PreemptorImplTest extends EasyMockTest {
private StateManager stateManager;
private SchedulingFilter schedulingFilter;
private FakeClock clock;
- private MaintenanceController maintenance;
+ private StatsProvider statsProvider;
private OfferQueue offerQueue;
private AttributeAggregate emptyJob;
@@ -112,8 +113,8 @@ public class PreemptorImplTest extends EasyMockTest {
storageUtil = new StorageTestUtil(this);
storageUtil.expectOperations();
stateManager = createMock(StateManager.class);
- maintenance = createMock(MaintenanceController.class);
clock = new FakeClock();
+ statsProvider = new FakeStatsProvider();
offerQueue = createMock(OfferQueue.class);
emptyJob = new AttributeAggregate(
Suppliers.ofInstance(ImmutableSet.<IScheduledTask>of()),
@@ -128,7 +129,7 @@ public class PreemptorImplTest extends EasyMockTest {
schedulingFilter,
PREEMPTION_DELAY,
clock,
- maintenance);
+ statsProvider);
preemptor.findPreemptionSlotFor(pendingTask.getAssignedTask().getTaskId(), emptyJob);
}
@@ -150,12 +151,10 @@ public class PreemptorImplTest extends EasyMockTest {
IScheduledTask.setFromBuilders(Arrays.asList(returnedTasks)));
}
- private void expectGetMaintenance(String host) {
- expect(maintenance.getMode(host)).andReturn(MaintenanceMode.NONE);
- }
-
@Test
public void testPreempted() throws Exception {
+ setUpHost(HOST_A, RACK_A);
+
schedulingFilter = createMock(SchedulingFilter.class);
ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A);
runOnHost(lowPriority, HOST_A);
@@ -168,8 +167,6 @@ public class PreemptorImplTest extends EasyMockTest {
expectGetPendingTasks(highPriority);
expectGetActiveTasks(lowPriority);
- expectGetMaintenance(HOST_A);
-
expectFiltering();
expectPreempted(lowPriority);
@@ -179,6 +176,8 @@ public class PreemptorImplTest extends EasyMockTest {
@Test
public void testLowestPriorityPreempted() throws Exception {
+ setUpHost(HOST_A, RACK_A);
+
schedulingFilter = createMock(SchedulingFilter.class);
ScheduledTask lowPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 10);
runOnHost(lowPriority, HOST_A);
@@ -194,7 +193,6 @@ public class PreemptorImplTest extends EasyMockTest {
expectGetPendingTasks(highPriority);
expectGetActiveTasks(lowerPriority, lowerPriority);
- expectGetMaintenance(HOST_A);
expectFiltering();
expectPreempted(lowerPriority);
@@ -204,6 +202,8 @@ public class PreemptorImplTest extends EasyMockTest {
@Test
public void testOnePreemptableTask() throws Exception {
+ setUpHost(HOST_A, RACK_A);
+
schedulingFilter = createMock(SchedulingFilter.class);
ScheduledTask highPriority = makeTask(USER_A, JOB_A, TASK_ID_A, 100);
runOnHost(highPriority, HOST_A);
@@ -222,7 +222,6 @@ public class PreemptorImplTest extends EasyMockTest {
expectGetPendingTasks(pendingPriority);
expectGetActiveTasks(highPriority, lowerPriority, lowestPriority);
- expectGetMaintenance(HOST_A);
expectFiltering();
expectPreempted(lowestPriority);
@@ -250,6 +249,8 @@ public class PreemptorImplTest extends EasyMockTest {
@Test
public void testProductionPreemptingNonproduction() throws Exception {
+ setUpHost(HOST_A, RACK_A);
+
schedulingFilter = createMock(SchedulingFilter.class);
// Use a very low priority for the production task to show that priority is irrelevant.
ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
@@ -263,7 +264,6 @@ public class PreemptorImplTest extends EasyMockTest {
expectGetPendingTasks(p1);
expectGetActiveTasks(a1);
- expectGetMaintenance(HOST_A);
expectFiltering();
expectPreempted(a1);
@@ -273,6 +273,8 @@ public class PreemptorImplTest extends EasyMockTest {
@Test
public void testProductionPreemptingNonproductionAcrossUsers() throws Exception {
+ setUpHost(HOST_A, RACK_A);
+
schedulingFilter = createMock(SchedulingFilter.class);
// Use a very low priority for the production task to show that priority is irrelevant.
ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1", -1000);
@@ -286,7 +288,6 @@ public class PreemptorImplTest extends EasyMockTest {
expectGetPendingTasks(p1);
expectGetActiveTasks(a1);
- expectGetMaintenance(HOST_A);
expectFiltering();
expectPreempted(a1);
@@ -315,7 +316,7 @@ public class PreemptorImplTest extends EasyMockTest {
// Ensures a production task can preempt 2 tasks on the same host.
@Test
public void testProductionPreemptingManyNonProduction() throws Exception {
- schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
+ schedulingFilter = new SchedulingFilterImpl();
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
a1.getAssignedTask().getTask().setNumCpus(1).setRamMb(512);
@@ -347,7 +348,7 @@ public class PreemptorImplTest extends EasyMockTest {
// Ensures we select the minimal number of tasks to preempt
@Test
public void testMinimalSetPreempted() throws Exception {
- schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
+ schedulingFilter = new SchedulingFilterImpl();
ScheduledTask a1 = makeTask(USER_A, JOB_A, TASK_ID_A + "_a1");
a1.getAssignedTask().getTask().setNumCpus(4).setRamMb(4096);
@@ -382,7 +383,7 @@ public class PreemptorImplTest extends EasyMockTest {
// Ensures a production task *never* preempts a production task from another job.
@Test
public void testProductionJobNeverPreemptsProductionJob() throws Exception {
- schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
+ schedulingFilter = new SchedulingFilterImpl();
ScheduledTask p1 = makeProductionTask(USER_A, JOB_A, TASK_ID_A + "_p1");
p1.getAssignedTask().getTask().setNumCpus(2).setRamMb(1024);
@@ -407,7 +408,7 @@ public class PreemptorImplTest extends EasyMockTest {
// Ensures that we can preempt if a task + offer can satisfy a pending task.
@Test
public void testPreemptWithOfferAndTask() throws Exception {
- schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
+ schedulingFilter = new SchedulingFilterImpl();
setUpHost(HOST_A, RACK_A);
@@ -435,7 +436,7 @@ public class PreemptorImplTest extends EasyMockTest {
// Ensures we can preempt if two tasks and an offer can satisfy a pending task.
@Test
public void testPreemptWithOfferAndMultipleTasks() throws Exception {
- schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
+ schedulingFilter = new SchedulingFilterImpl();
setUpHost(HOST_A, RACK_A);
@@ -468,7 +469,7 @@ public class PreemptorImplTest extends EasyMockTest {
// Ensures we don't preempt if a host has enough slack to satisfy a pending task.
@Test
public void testPreemptWithLargeOffer() throws Exception {
- schedulingFilter = new SchedulingFilterImpl(storageUtil.storage);
+ schedulingFilter = new SchedulingFilterImpl();
setUpHost(HOST_A, RACK_A);
@@ -524,7 +525,7 @@ public class PreemptorImplTest extends EasyMockTest {
schedulingFilter,
PREEMPTION_DELAY,
clock,
- maintenance);
+ statsProvider);
assertEquals(
Optional.<String>absent(),
@@ -556,7 +557,9 @@ public class PreemptorImplTest extends EasyMockTest {
.transform(new Function<Offer, HostOffer>() {
@Override
public HostOffer apply(Offer offer) {
- return new HostOffer(offer, MaintenanceMode.NONE);
+ return new HostOffer(
+ offer,
+ IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)));
}
});
expect(offerQueue.getOffers()).andReturn(hostOffers);
@@ -569,8 +572,7 @@ public class PreemptorImplTest extends EasyMockTest {
private IExpectationSetters<Set<Veto>> expectFiltering() {
return expect(schedulingFilter.filter(
EasyMock.<ResourceSlot>anyObject(),
- EasyMock.eq(HOST_A),
- EasyMock.eq(MaintenanceMode.NONE),
+ EasyMock.<IHostAttributes>anyObject(),
EasyMock.<ITaskConfig>anyObject(),
EasyMock.<String>anyObject(),
EasyMock.eq(emptyJob))).andAnswer(
@@ -659,6 +661,5 @@ public class PreemptorImplTest extends EasyMockTest {
expect(this.storageUtil.attributeStore.getHostAttributes(host))
.andReturn(Optional.of(hostAttrs)).anyTimes();
- expect(this.maintenance.getMode(host)).andReturn(NONE).anyTimes();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/b80e69c9/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
index 0e699c9..0e8a98c 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskSchedulerImplTest.java
@@ -29,11 +29,13 @@ import com.twitter.common.util.Clock;
import com.twitter.common.util.testing.FakeClock;
import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.HostAttributes;
import org.apache.aurora.gen.Identity;
import org.apache.aurora.gen.JobKey;
import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
@@ -48,6 +50,7 @@ import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.mem.MemStorage;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
@@ -58,7 +61,6 @@ import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
-import static org.apache.aurora.scheduler.async.OfferQueue.HostOffer;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
@@ -69,8 +71,9 @@ public class TaskSchedulerImplTest extends EasyMockTest {
private static final IScheduledTask TASK_A = makeTask("a");
private static final IScheduledTask TASK_B = makeTask("b");
- private static final HostOffer OFFER =
- new HostOffer(Offers.makeOffer("OFFER_A", "HOST_A"), MaintenanceMode.NONE);
+ private static final HostOffer OFFER = new HostOffer(
+ Offers.makeOffer("OFFER_A", "HOST_A"),
+ IHostAttributes.build(new HostAttributes().setMode(MaintenanceMode.NONE)));
private StorageTestUtil storageUtil;
private StateManager stateManager;