You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/10/20 02:38:58 UTC
aurora git commit: Provide a formal way to disable offer declining
Repository: aurora
Updated Branches:
refs/heads/master 53970139d -> a93538966
Provide a formal way to disable offer declining
Increasing the offer hold time to effectively disable offer declines is a trap,
as the queue of asynchronous declines will grow without bound. This introduces
a command line argument to explicitly disable declining.
Reviewed at https://reviews.apache.org/r/63157/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/a9353896
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/a9353896
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/a9353896
Branch: refs/heads/master
Commit: a935389662b72486f49e507493557d360dc97178
Parents: 5397013
Author: Bill Farner <wf...@apache.org>
Authored: Thu Oct 19 19:39:02 2017 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Thu Oct 19 19:39:02 2017 -0700
----------------------------------------------------------------------
RELEASE-NOTES.md | 5 +-
.../aurora/benchmark/SchedulingBenchmarks.java | 7 +-
.../aurora/scheduler/offers/Deferment.java | 70 +++++++
.../aurora/scheduler/offers/OfferManager.java | 54 +++--
.../aurora/scheduler/offers/OfferSettings.java | 25 +--
.../aurora/scheduler/offers/OffersModule.java | 46 +++--
.../scheduler/config/CommandLineTest.java | 2 +
.../scheduler/offers/OfferManagerImplTest.java | 205 +++++++------------
8 files changed, 222 insertions(+), 192 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index f4cc416..1ec6d74 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -16,7 +16,10 @@
- Added the `thrift_method_interceptor_modules` scheduler flag that lets cluster operators inject
custom Thrift method interceptors.
- Increase default ZooKeeper session timeout from 4 to 15 seconds.
-- Add option `-zk_connection_timeout` to control the connection timeout of ZooKeeper connections.
+- Added option `-zk_connection_timeout` to control the connection timeout of ZooKeeper connections.
+- Added scheduler command line argument `-hold_offers_forever`, suitable for use in clusters where
+ Aurora is the only framework. This setting disables other options such as `-min_offer_hold_time`,
+ and allows the scheduler to more efficiently cache scheduling attempts.
### Deprecations and removals:
http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index 7d37668..5a9099b 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -55,6 +55,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter;
import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
import org.apache.aurora.scheduler.mesos.Driver;
import org.apache.aurora.scheduler.mesos.TestExecutorSettings;
+import org.apache.aurora.scheduler.offers.Deferment;
import org.apache.aurora.scheduler.offers.OfferManager;
import org.apache.aurora.scheduler.offers.OfferOrder;
import org.apache.aurora.scheduler.offers.OfferSettings;
@@ -148,13 +149,11 @@ public class SchedulingBenchmarks {
// No-op.
}
});
+ bind(Deferment.class).to(Deferment.Noop.class);
bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
bind(OfferSettings.class).toInstance(
- new OfferSettings(
- NO_DELAY,
- () -> DELAY_FOREVER,
- ImmutableList.of(OfferOrder.RANDOM)));
+ new OfferSettings(NO_DELAY, ImmutableList.of(OfferOrder.RANDOM)));
bind(BiCache.BiCacheSettings.class).toInstance(
new BiCache.BiCacheSettings(DELAY_FOREVER, ""));
bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class);
http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java b/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java
new file mode 100644
index 0000000..f3ec886
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.offers;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Supplier;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.DelayExecutor;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Determines if and when a deferred action should be performed.
+ */
+public interface Deferment {
+
+ /**
+ * Defers an action to possibly be performed at some point in the future.
+ *
+ * @param action Callback to perform the deferred action.
+ */
+ void defer(Runnable action);
+
+ /**
+ * Never performs deferred actions.
+ */
+ class Noop implements Deferment {
+ @Override
+ public void defer(Runnable action) {
+ // no-op
+ }
+ }
+
+ /**
+ * Performs a deferred action after a dynamic delay.
+ */
+ class DelayedDeferment implements Deferment {
+ private final Supplier<Amount<Long, Time>> delay;
+ private final DelayExecutor executor;
+
+ @Inject
+ public DelayedDeferment(
+ Supplier<Amount<Long, Time>> delay,
+ @AsyncExecutor DelayExecutor executor) {
+
+ this.delay = requireNonNull(delay);
+ this.executor = requireNonNull(executor);
+ }
+
+ @Override
+ public void defer(Runnable action) {
+ executor.execute(action, delay.get());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
index e833431..7011a4c 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
@@ -37,8 +37,6 @@ import org.apache.aurora.common.inject.TimedInterceptor.Timed;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
-import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
@@ -165,7 +163,7 @@ public interface OfferManager extends EventSubscriber {
private final Driver driver;
private final OfferSettings offerSettings;
- private final DelayExecutor executor;
+ private final Deferment offerDecline;
@Inject
@VisibleForTesting
@@ -173,36 +171,28 @@ public interface OfferManager extends EventSubscriber {
Driver driver,
OfferSettings offerSettings,
StatsProvider statsProvider,
- @AsyncExecutor DelayExecutor executor) {
+ Deferment offerDecline) {
this.driver = requireNonNull(driver);
this.offerSettings = requireNonNull(offerSettings);
- this.executor = requireNonNull(executor);
this.hostOffers = new HostOffers(statsProvider, offerSettings.getOfferOrder());
this.offerRaces = statsProvider.makeCounter(OFFER_ACCEPT_RACES);
this.offerCancelFailures = statsProvider.makeCounter(OFFER_CANCEL_FAILURES);
+ this.offerDecline = requireNonNull(offerDecline);
}
@Override
- public void addOffer(final HostOffer offer) {
- // We run a slight risk of a race here, which is acceptable. The worst case is that we
- // temporarily hold two offers for the same host, which should be corrected when we return
- // them after the return delay.
- // There's also a chance that we return an offer for compaction ~simultaneously with the
- // same-host offer being cancelled/returned. This is also fine.
- Optional<HostOffer> sameSlave = hostOffers.get(offer.getOffer().getAgentId());
- if (sameSlave.isPresent()) {
- // If there are existing offers for the slave, decline all of them so the master can
- // compact all of those offers into a single offer and send them back.
+ public void addOffer(HostOffer offer) {
+ Optional<HostOffer> sameAgent = hostOffers.addAndPreventAgentCollision(offer);
+ if (sameAgent.isPresent()) {
+ // We have an existing offer for the same agent. We choose to return both offers so that
+ // they may be combined into a single offer.
LOG.info("Returning offers for " + offer.getOffer().getAgentId().getValue()
+ " for compaction.");
decline(offer.getOffer().getId());
- removeAndDecline(sameSlave.get().getOffer().getId());
+ decline(sameAgent.get().getOffer().getId());
} else {
- hostOffers.add(offer);
- executor.execute(
- () -> removeAndDecline(offer.getOffer().getId()),
- offerSettings.getOfferReturnDelay());
+ offerDecline.defer(() -> removeAndDecline(offer.getOffer().getId()));
}
}
@@ -326,7 +316,27 @@ public interface OfferManager extends EventSubscriber {
return Optional.of(offer);
}
- synchronized void add(HostOffer offer) {
+ /**
+ * Adds an offer while maintaining a guarantee that no two offers may exist with the same
+ * agent ID. If an offer exists with the same agent ID, the existing offer is removed
+ * and returned, and {@code offer} is not added.
+ *
+ * @param offer Offer to add.
+ * @return The pre-existing offer with the same agent ID as {@code offer}, if one exists,
+ * which will also be removed prior to returning.
+ */
+ synchronized Optional<HostOffer> addAndPreventAgentCollision(HostOffer offer) {
+ HostOffer sameAgent = offersBySlave.get(offer.getOffer().getAgentId());
+ if (sameAgent != null) {
+ remove(sameAgent.getOffer().getId());
+ return Optional.of(sameAgent);
+ }
+
+ addInternal(offer);
+ return Optional.absent();
+ }
+
+ private void addInternal(HostOffer offer) {
offers.add(offer);
offersById.put(offer.getOffer().getId(), offer);
offersBySlave.put(offer.getOffer().getAgentId(), offer);
@@ -350,7 +360,7 @@ public interface OfferManager extends EventSubscriber {
if (offer != null) {
// Remove and re-add a host's offer to re-sort based on its new hostStatus
remove(offer.getOffer().getId());
- add(new HostOffer(offer.getOffer(), attributes));
+ addInternal(new HostOffer(offer.getOffer(), attributes));
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
index 4c6fd54..e060f20 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.offers;
import java.util.List;
-import com.google.common.base.Supplier;
import com.google.common.collect.Ordering;
import org.apache.aurora.common.quantity.Amount;
@@ -30,24 +29,14 @@ import static java.util.Objects.requireNonNull;
public class OfferSettings {
private final Amount<Long, Time> offerFilterDuration;
- private final Supplier<Amount<Long, Time>> returnDelaySupplier;
private final Ordering<HostOffer> offerOrder;
- public OfferSettings(
- Amount<Long, Time> offerFilterDuration,
- Supplier<Amount<Long, Time>> returnDelaySupplier,
- List<OfferOrder> offerOrder) {
-
- this(offerFilterDuration, returnDelaySupplier, OfferOrderBuilder.create(offerOrder));
+ public OfferSettings(Amount<Long, Time> offerFilterDuration, List<OfferOrder> offerOrder) {
+ this(offerFilterDuration, OfferOrderBuilder.create(offerOrder));
}
- OfferSettings(
- Amount<Long, Time> offerFilterDuration,
- Supplier<Amount<Long, Time>> returnDelaySupplier,
- Ordering<HostOffer> offerOrder) {
-
+ OfferSettings(Amount<Long, Time> offerFilterDuration, Ordering<HostOffer> offerOrder) {
this.offerFilterDuration = requireNonNull(offerFilterDuration);
- this.returnDelaySupplier = requireNonNull(returnDelaySupplier);
this.offerOrder = requireNonNull(offerOrder);
}
@@ -59,14 +48,6 @@ public class OfferSettings {
}
/**
- * The amount of time after which an unused offer should be 'returned' to Mesos by declining it.
- * The delay is calculated for each offer using a random duration within a fixed window.
- */
- public Amount<Long, Time> getOfferReturnDelay() {
- return returnDelaySupplier.get();
- }
-
- /**
* The ordering to use when fetching offers from OfferManager.
*/
public Ordering<HostOffer> getOfferOrder() {
http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java b/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
index ab98add..e6b2c55 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
@@ -22,6 +22,7 @@ import javax.inject.Singleton;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
+import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.inject.AbstractModule;
@@ -55,6 +56,12 @@ public class OffersModule extends AbstractModule {
@Parameters(separators = "=")
public static class Options {
+ @Parameter(names = "-hold_offers_forever",
+ description =
+ "Hold resource offers indefinitely, disabling automatic offer decline settings.",
+ arity = 1)
+ public boolean holdOffersForever = false;
+
@Parameter(names = "-min_offer_hold_time",
validateValueWith = NotNegativeAmount.class,
description = "Minimum amount of time to hold a resource offer before declining.")
@@ -122,15 +129,17 @@ public class OffersModule extends AbstractModule {
@Override
protected void configure() {
Options options = cliOptions.offer;
- long offerHoldTime =
- options.offerHoldJitterWindow.as(Time.SECONDS) + options.minOfferHoldTime.as(Time.SECONDS);
- if (options.unavailabilityThreshold.as(Time.SECONDS) < offerHoldTime) {
- LOG.warn("unavailability_threshold ({}) is less than the sum of min_offer_hold_time ({}) and"
- + "offer_hold_jitter_window ({}). This creates risks of races between launching and"
- + "draining",
- options.unavailabilityThreshold,
- options.minOfferHoldTime,
- options.offerHoldJitterWindow);
+ if (!options.holdOffersForever) {
+ long offerHoldTime = options.offerHoldJitterWindow.as(Time.SECONDS)
+ + options.minOfferHoldTime.as(Time.SECONDS);
+ if (options.unavailabilityThreshold.as(Time.SECONDS) < offerHoldTime) {
+ LOG.warn("unavailability_threshold ({}) is less than the sum of min_offer_hold_time ({})"
+ + " and offer_hold_jitter_window ({}). This creates risks of races between "
+ + "launching and draining",
+ options.unavailabilityThreshold,
+ options.minOfferHoldTime,
+ options.offerHoldJitterWindow);
+ }
}
for (Module module: MoreModules.instantiateAll(options.offerOrderModules, cliOptions)) {
@@ -144,6 +153,17 @@ public class OffersModule extends AbstractModule {
install(new PrivateModule() {
@Override
protected void configure() {
+ if (options.holdOffersForever) {
+ bind(Deferment.class).to(Deferment.Noop.class);
+ } else {
+ bind(new TypeLiteral<Supplier<Amount<Long, Time>>>() { }).toInstance(
+ new RandomJitterReturnDelay(
+ options.minOfferHoldTime.as(Time.MILLISECONDS),
+ options.offerHoldJitterWindow.as(Time.MILLISECONDS),
+ Random.Util.newDefaultRandom()));
+ bind(Deferment.class).to(Deferment.DelayedDeferment.class);
+ }
+
bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
expose(OfferManager.class);
@@ -155,12 +175,6 @@ public class OffersModule extends AbstractModule {
@Provides
@Singleton
OfferSettings provideOfferSettings(Ordering<HostOffer> offerOrdering) {
- return new OfferSettings(
- cliOptions.offer.offerFilterDuration,
- new RandomJitterReturnDelay(
- cliOptions.offer.minOfferHoldTime.as(Time.MILLISECONDS),
- cliOptions.offer.offerHoldJitterWindow.as(Time.MILLISECONDS),
- Random.Util.newDefaultRandom()),
- offerOrdering);
+ return new OfferSettings(cliOptions.offer.offerFilterDuration, offerOrdering);
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
index 8f4f63c..5b50244 100644
--- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
@@ -117,6 +117,7 @@ public class CommandLineTest {
expected.reconciliation.reconciliationScheduleSpread = TEST_TIME;
expected.reconciliation.reconciliationBatchSize = 42;
expected.reconciliation.reconciliationBatchInterval = TEST_TIME;
+ expected.offer.holdOffersForever = true;
expected.offer.minOfferHoldTime = TEST_TIME;
expected.offer.offerHoldJitterWindow = TEST_TIME;
expected.offer.offerFilterDuration = TEST_TIME;
@@ -269,6 +270,7 @@ public class CommandLineTest {
"-reconciliation_schedule_spread=42days",
"-reconciliation_explicit_batch_size=42",
"-reconciliation_explicit_batch_interval=42days",
+ "-hold_offers_forever=true",
"-min_offer_hold_time=42days",
"-offer_hold_jitter_window=42days",
"-offer_filter_duration=42days",
http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
index 815a7e8..6c8434e 100644
--- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
@@ -32,6 +32,7 @@ import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
import org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged;
import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.offers.Deferment.Noop;
import org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl;
import org.apache.aurora.scheduler.resources.ResourceType;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -101,42 +102,31 @@ public class OfferManagerImplTest extends EasyMockTest {
.setLaunch(Operation.Launch.newBuilder().addTaskInfos(TASK_INFO))
.build();
private static final List<Operation> OPERATIONS = ImmutableList.of(launch);
- private static final long OFFER_FILTER_SECONDS = 0L;
+ private static final long OFFER_FILTER_SECONDS = 0;
private static final Filters OFFER_FILTER = Filters.newBuilder()
.setRefuseSeconds(OFFER_FILTER_SECONDS)
.build();
private Driver driver;
- private FakeScheduledExecutor clock;
private OfferManagerImpl offerManager;
private FakeStatsProvider statsProvider;
@Before
public void setUp() {
driver = createMock(Driver.class);
- DelayExecutor executorMock = createMock(DelayExecutor.class);
- clock = FakeScheduledExecutor.fromDelayExecutor(executorMock);
- addTearDown(clock::assertEmpty);
OfferSettings offerSettings = new OfferSettings(
Amount.of(OFFER_FILTER_SECONDS, Time.SECONDS),
- () -> RETURN_DELAY,
ImmutableList.of(OfferOrder.RANDOM));
statsProvider = new FakeStatsProvider();
- offerManager = new OfferManagerImpl(driver, offerSettings, statsProvider, executorMock);
+ offerManager = new OfferManagerImpl(driver, offerSettings, statsProvider, new Noop());
}
@Test
- public void testOffersSortedByUnavailability() throws Exception {
- clock.advance(Amount.of(1L, Time.HOURS));
-
- HostOffer hostOfferB = setUnavailability(OFFER_B, clock.nowMillis());
- Long offerCStartTime = clock.nowMillis() + ONE_HOUR.as(Time.MILLISECONDS);
+ public void testOffersSortedByUnavailability() {
+ HostOffer hostOfferB = setUnavailability(OFFER_B, 1);
+ long offerCStartTime = ONE_HOUR.as(Time.MILLISECONDS);
HostOffer hostOfferC = setUnavailability(OFFER_C, offerCStartTime);
- driver.declineOffer(OFFER_B.getOffer().getId(), OFFER_FILTER);
- driver.declineOffer(OFFER_A.getOffer().getId(), OFFER_FILTER);
- driver.declineOffer(OFFER_C.getOffer().getId(), OFFER_FILTER);
-
control.replay();
offerManager.addOffer(hostOfferB);
@@ -149,8 +139,6 @@ public class OfferManagerImplTest extends EasyMockTest {
// hostOfferC has a further away start time, so it should be preferred.
ImmutableList.of(OFFER_A, hostOfferC, hostOfferB),
actual);
-
- clock.advance(RETURN_DELAY);
}
@Test
@@ -163,35 +151,23 @@ public class OfferManagerImplTest extends EasyMockTest {
driver.acceptOffers(OFFER_B.getOffer().getId(), OPERATIONS, OFFER_FILTER);
expectLastCall();
- driver.declineOffer(OFFER_A_ID, OFFER_FILTER);
- expectLastCall();
- driver.declineOffer(offerC.getOffer().getId(), OFFER_FILTER);
- expectLastCall();
-
control.replay();
offerManager.addOffer(offerA);
- assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+ assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
offerManager.addOffer(OFFER_B);
- assertEquals(2L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+ assertEquals(2, statsProvider.getLongValue(OUTSTANDING_OFFERS));
offerManager.addOffer(offerC);
- assertEquals(3L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+ assertEquals(3, statsProvider.getLongValue(OUTSTANDING_OFFERS));
assertEquals(
ImmutableSet.of(OFFER_B, offerA, offerC),
ImmutableSet.copyOf(offerManager.getOffers()));
offerManager.launchTask(OFFER_B.getOffer().getId(), TASK_INFO);
- assertEquals(2L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
- clock.advance(RETURN_DELAY);
- assertEquals(0L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+ assertEquals(2, statsProvider.getLongValue(OUTSTANDING_OFFERS));
}
@Test
- public void hostAttributeChangeUpdatesOfferSorting() throws Exception {
- driver.declineOffer(OFFER_A_ID, OFFER_FILTER);
- expectLastCall();
- driver.declineOffer(OFFER_B.getOffer().getId(), OFFER_FILTER);
- expectLastCall();
-
+ public void hostAttributeChangeUpdatesOfferSorting() {
control.replay();
offerManager.hostAttributesChanged(new HostAttributesChanged(HOST_ATTRIBUTES_A));
@@ -209,8 +185,6 @@ public class OfferManagerImplTest extends EasyMockTest {
offerManager.hostAttributesChanged(new HostAttributesChanged(offerA.getAttributes()));
offerManager.hostAttributesChanged(new HostAttributesChanged(offerB.getAttributes()));
assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getOffers()));
-
- clock.advance(RETURN_DELAY);
}
@Test
@@ -221,108 +195,84 @@ public class OfferManagerImplTest extends EasyMockTest {
control.replay();
offerManager.addOffer(OFFER_A);
- assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+ assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
offerManager.addOffer(OFFER_A);
- assertEquals(0L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
-
- clock.advance(RETURN_DELAY);
+ assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS));
}
@Test
- public void testGetOffersReturnsAllOffers() throws Exception {
+ public void testGetOffersReturnsAllOffers() {
control.replay();
offerManager.addOffer(OFFER_A);
assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
- assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+ assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
offerManager.cancelOffer(OFFER_A_ID);
- assertEquals(0L, statsProvider.getLongValue(OFFER_CANCEL_FAILURES));
+ assertEquals(0, statsProvider.getLongValue(OFFER_CANCEL_FAILURES));
assertTrue(Iterables.isEmpty(offerManager.getOffers()));
- assertEquals(0L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
-
- clock.advance(RETURN_DELAY);
+ assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS));
}
@Test
- public void testOfferFilteringDueToStaticBan() throws Exception {
- driver.declineOffer(OFFER_A_ID, OFFER_FILTER);
- expectLastCall();
-
+ public void testOfferFilteringDueToStaticBan() {
control.replay();
// Static ban ignored when now offers.
offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
- assertEquals(0L, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+ assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
offerManager.addOffer(OFFER_A);
assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
// Add static ban.
offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
- assertEquals(1L, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+ assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
-
- clock.advance(RETURN_DELAY);
- assertEquals(0L, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
}
@Test
- public void testStaticBanIsClearedOnOfferReturn() throws Exception {
- driver.declineOffer(OFFER_A_ID, OFFER_FILTER);
- expectLastCall().times(2);
-
+ public void testStaticBanIsClearedOnOfferReturn() {
control.replay();
offerManager.addOffer(OFFER_A);
offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
- assertEquals(1L, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+ assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
// Make sure the static ban is cleared when the offers are returned.
- clock.advance(RETURN_DELAY);
+ offerManager.cancelOffer(OFFER_A_ID);
offerManager.addOffer(OFFER_A);
assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
- assertEquals(0L, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
-
- clock.advance(RETURN_DELAY);
+ assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
}
@Test
- public void testStaticBanIsClearedOnDriverDisconnect() throws Exception {
- driver.declineOffer(OFFER_A_ID, OFFER_FILTER);
- expectLastCall();
-
+ public void testStaticBanIsClearedOnDriverDisconnect() {
control.replay();
offerManager.addOffer(OFFER_A);
offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
- assertEquals(1L, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+ assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
// Make sure the static ban is cleared when driver is disconnected.
offerManager.driverDisconnected(new DriverDisconnected());
- assertEquals(0L, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+ assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
offerManager.addOffer(OFFER_A);
assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
-
- clock.advance(RETURN_DELAY);
}
@Test
- public void getOffer() {
- driver.declineOffer(OFFER_A_ID, OFFER_FILTER);
- expectLastCall();
-
+ public void testGetOffer() {
control.replay();
offerManager.addOffer(OFFER_A);
assertEquals(Optional.of(OFFER_A), offerManager.getOffer(OFFER_A.getOffer().getAgentId()));
- assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
- clock.advance(RETURN_DELAY);
+ assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
}
@Test(expected = OfferManager.LaunchException.class)
@@ -333,12 +283,7 @@ public class OfferManagerImplTest extends EasyMockTest {
control.replay();
offerManager.addOffer(OFFER_A);
-
- try {
- offerManager.launchTask(OFFER_A_ID, TASK_INFO);
- } finally {
- clock.advance(RETURN_DELAY);
- }
+ offerManager.launchTask(OFFER_A_ID, TASK_INFO);
}
@Test
@@ -348,66 +293,48 @@ public class OfferManagerImplTest extends EasyMockTest {
offerManager.launchTask(OFFER_A_ID, TASK_INFO);
fail("Method invocation is expected to throw exception.");
} catch (OfferManager.LaunchException e) {
- assertEquals(1L, statsProvider.getLongValue(OFFER_ACCEPT_RACES));
+ assertEquals(1, statsProvider.getLongValue(OFFER_ACCEPT_RACES));
}
}
@Test
- public void testFlushOffers() throws Exception {
+ public void testFlushOffers() {
control.replay();
offerManager.addOffer(OFFER_A);
offerManager.addOffer(OFFER_B);
- assertEquals(2L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+ assertEquals(2, statsProvider.getLongValue(OUTSTANDING_OFFERS));
offerManager.driverDisconnected(new DriverDisconnected());
- assertEquals(0L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
- clock.advance(RETURN_DELAY);
+ assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS));
}
@Test
- public void testCancelFailure() throws Exception {
+ public void testCancelFailure() {
control.replay();
offerManager.cancelOffer(OFFER_A.getOffer().getId());
- assertEquals(1L, statsProvider.getLongValue(OFFER_CANCEL_FAILURES));
- }
-
- @Test
- public void testDeclineOffer() throws Exception {
- driver.declineOffer(OFFER_A.getOffer().getId(), OFFER_FILTER);
- expectLastCall();
-
- control.replay();
-
- offerManager.addOffer(OFFER_A);
- assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
- clock.advance(RETURN_DELAY);
- assertEquals(0L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+ assertEquals(1, statsProvider.getLongValue(OFFER_CANCEL_FAILURES));
}
@Test
- public void testBanAndUnbanOffer() throws Exception {
- driver.declineOffer(OFFER_A.getOffer().getId(), OFFER_FILTER);
- expectLastCall().times(2);
+ public void testBanAndUnbanOffer() {
control.replay();
// After adding a banned offer, user can see it is in OUTSTANDING_OFFERS but cannot retrieve it.
offerManager.banOffer(OFFER_A_ID);
offerManager.addOffer(OFFER_A);
- assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
- assertEquals(1L, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
+ assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+ assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
- clock.advance(RETURN_DELAY);
offerManager.cancelOffer(OFFER_A_ID);
offerManager.addOffer(OFFER_A);
- assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
- assertEquals(0L, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
+ assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+ assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
- clock.advance(RETURN_DELAY);
}
- private static HostOffer setUnavailability(HostOffer offer, Long startMs) {
+ private static HostOffer setUnavailability(HostOffer offer, long startMs) {
Unavailability unavailability = Unavailability.newBuilder()
.setStart(TimeInfo.newBuilder().setNanoseconds(startMs * 1000L)).build();
return new HostOffer(
@@ -422,17 +349,13 @@ public class OfferManagerImplTest extends EasyMockTest {
}
private OfferManager createOrderedManager(List<OfferOrder> order) {
- OfferSettings settings = new OfferSettings(
- Amount.of(OFFER_FILTER_SECONDS, Time.SECONDS),
- () -> RETURN_DELAY,
- order);
- DelayExecutor executorMock = createMock(DelayExecutor.class);
- clock = FakeScheduledExecutor.fromDelayExecutor(executorMock);
- return new OfferManagerImpl(driver, settings, statsProvider, executorMock);
+ OfferSettings settings =
+ new OfferSettings(Amount.of(OFFER_FILTER_SECONDS, Time.SECONDS), order);
+ return new OfferManagerImpl(driver, settings, statsProvider, new Noop());
}
@Test
- public void testCPUOrdering() throws Exception {
+ public void testCPUOrdering() {
OfferManager cpuManager = createOrderedManager(ImmutableList.of(OfferOrder.CPU));
HostOffer small = setMode(new HostOffer(
@@ -464,7 +387,7 @@ public class OfferManagerImplTest extends EasyMockTest {
}
@Test
- public void testRevocableCPUOrdering() throws Exception {
+ public void testRevocableCPUOrdering() {
ResourceType.initializeEmptyCliArgsForTest();
OfferManager cpuManager = createOrderedManager(ImmutableList.of(OfferOrder.REVOCABLE_CPU));
@@ -501,7 +424,7 @@ public class OfferManagerImplTest extends EasyMockTest {
}
@Test
- public void testDiskOrdering() throws Exception {
+ public void testDiskOrdering() {
OfferManager cpuManager = createOrderedManager(ImmutableList.of(OfferOrder.DISK));
HostOffer small = setMode(new HostOffer(
@@ -529,7 +452,7 @@ public class OfferManagerImplTest extends EasyMockTest {
}
@Test
- public void testMemoryOrdering() throws Exception {
+ public void testMemoryOrdering() {
OfferManager cpuManager = createOrderedManager(ImmutableList.of(OfferOrder.MEMORY));
HostOffer small = setMode(new HostOffer(
@@ -557,7 +480,7 @@ public class OfferManagerImplTest extends EasyMockTest {
}
@Test
- public void testCPUMemoryOrdering() throws Exception {
+ public void testCPUMemoryOrdering() {
OfferManager cpuManager = createOrderedManager(
ImmutableList.of(OfferOrder.CPU, OfferOrder.MEMORY));
@@ -594,4 +517,32 @@ public class OfferManagerImplTest extends EasyMockTest {
assertEquals(ImmutableList.of(small, medium, large),
ImmutableList.copyOf(cpuManager.getOffers()));
}
+
+ @Test
+ public void testDelayedOfferReturn() {
+ OfferSettings settings = new OfferSettings(
+ Amount.of(OFFER_FILTER_SECONDS, Time.SECONDS),
+ ImmutableList.of(OfferOrder.RANDOM));
+ DelayExecutor executorMock = createMock(DelayExecutor.class);
+ FakeScheduledExecutor clock = FakeScheduledExecutor.fromDelayExecutor(executorMock);
+ addTearDown(clock::assertEmpty);
+ offerManager = new OfferManagerImpl(
+ driver,
+ settings,
+ statsProvider,
+ new Deferment.DelayedDeferment(() -> RETURN_DELAY, executorMock));
+
+ driver.declineOffer(OFFER_A_ID, OFFER_FILTER);
+
+ control.replay();
+
+ offerManager.addOffer(OFFER_A);
+ offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
+ assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+ assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+
+ clock.advance(RETURN_DELAY);
+ assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+ assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+ }
}