You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/10/20 02:38:58 UTC

aurora git commit: Provide a formal way to disable offer declining

Repository: aurora
Updated Branches:
  refs/heads/master 53970139d -> a93538966


Provide a formal way to disable offer declining

Increasing the offer hold time to effectively disable offer declines is a trap,
as the queue of asynchronous declines will grow without bound.  This introduces
a command line argument to explicitly disable declining.

Reviewed at https://reviews.apache.org/r/63157/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/a9353896
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/a9353896
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/a9353896

Branch: refs/heads/master
Commit: a935389662b72486f49e507493557d360dc97178
Parents: 5397013
Author: Bill Farner <wf...@apache.org>
Authored: Thu Oct 19 19:39:02 2017 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Thu Oct 19 19:39:02 2017 -0700

----------------------------------------------------------------------
 RELEASE-NOTES.md                                |   5 +-
 .../aurora/benchmark/SchedulingBenchmarks.java  |   7 +-
 .../aurora/scheduler/offers/Deferment.java      |  70 +++++++
 .../aurora/scheduler/offers/OfferManager.java   |  54 +++--
 .../aurora/scheduler/offers/OfferSettings.java  |  25 +--
 .../aurora/scheduler/offers/OffersModule.java   |  46 +++--
 .../scheduler/config/CommandLineTest.java       |   2 +
 .../scheduler/offers/OfferManagerImplTest.java  | 205 +++++++------------
 8 files changed, 222 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index f4cc416..1ec6d74 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -16,7 +16,10 @@
 - Added the `thrift_method_interceptor_modules` scheduler flag that lets cluster operators inject
   custom Thrift method interceptors.
 - Increase default ZooKeeper session timeout from 4 to 15 seconds.
-- Add option `-zk_connection_timeout` to control the connection timeout of ZooKeeper connections.
+- Added option `-zk_connection_timeout` to control the connection timeout of ZooKeeper connections.
+- Added scheduler command line argument `-hold_offers_forever`, suitable for use in clusters where
+  Aurora is the only framework.  This setting disables other options such as `-min_offer_hold_time`,
+  and allows the scheduler to more efficiently cache scheduling attempts.
 
 ### Deprecations and removals:
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index 7d37668..5a9099b 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -55,6 +55,7 @@ import org.apache.aurora.scheduler.filter.SchedulingFilter;
 import org.apache.aurora.scheduler.filter.SchedulingFilterImpl;
 import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.mesos.TestExecutorSettings;
+import org.apache.aurora.scheduler.offers.Deferment;
 import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.aurora.scheduler.offers.OfferOrder;
 import org.apache.aurora.scheduler.offers.OfferSettings;
@@ -148,13 +149,11 @@ public class SchedulingBenchmarks {
                       // No-op.
                     }
                   });
+              bind(Deferment.class).to(Deferment.Noop.class);
               bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
               bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
               bind(OfferSettings.class).toInstance(
-                  new OfferSettings(
-                      NO_DELAY,
-                      () -> DELAY_FOREVER,
-                      ImmutableList.of(OfferOrder.RANDOM)));
+                  new OfferSettings(NO_DELAY, ImmutableList.of(OfferOrder.RANDOM)));
               bind(BiCache.BiCacheSettings.class).toInstance(
                   new BiCache.BiCacheSettings(DELAY_FOREVER, ""));
               bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java b/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java
new file mode 100644
index 0000000..f3ec886
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.offers;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Supplier;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.DelayExecutor;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Determines if and when a deferred action should be performed.
+ */
+public interface Deferment {
+
+  /**
+   * Defers an action to possibly be performed at some point in the future.
+   *
+   * @param action Callback to perform the deferred action.
+   */
+  void defer(Runnable action);
+
+  /**
+   * Never performs deferred actions.
+   */
+  class Noop implements Deferment {
+    @Override
+    public void defer(Runnable action) {
+      // no-op
+    }
+  }
+
+  /**
+   * Performs a deferred action after a dynamic delay.
+   */
+  class DelayedDeferment implements Deferment {
+    private final Supplier<Amount<Long, Time>> delay;
+    private final DelayExecutor executor;
+
+    @Inject
+    public DelayedDeferment(
+        Supplier<Amount<Long, Time>> delay,
+        @AsyncExecutor DelayExecutor executor) {
+
+      this.delay = requireNonNull(delay);
+      this.executor = requireNonNull(executor);
+    }
+
+    @Override
+    public void defer(Runnable action) {
+      executor.execute(action, delay.get());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
index e833431..7011a4c 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
@@ -37,8 +37,6 @@ import org.apache.aurora.common.inject.TimedInterceptor.Timed;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
-import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
@@ -165,7 +163,7 @@ public interface OfferManager extends EventSubscriber {
 
     private final Driver driver;
     private final OfferSettings offerSettings;
-    private final DelayExecutor executor;
+    private final Deferment offerDecline;
 
     @Inject
     @VisibleForTesting
@@ -173,36 +171,28 @@ public interface OfferManager extends EventSubscriber {
         Driver driver,
         OfferSettings offerSettings,
         StatsProvider statsProvider,
-        @AsyncExecutor DelayExecutor executor) {
+        Deferment offerDecline) {
 
       this.driver = requireNonNull(driver);
       this.offerSettings = requireNonNull(offerSettings);
-      this.executor = requireNonNull(executor);
       this.hostOffers = new HostOffers(statsProvider, offerSettings.getOfferOrder());
       this.offerRaces = statsProvider.makeCounter(OFFER_ACCEPT_RACES);
       this.offerCancelFailures = statsProvider.makeCounter(OFFER_CANCEL_FAILURES);
+      this.offerDecline = requireNonNull(offerDecline);
     }
 
     @Override
-    public void addOffer(final HostOffer offer) {
-      // We run a slight risk of a race here, which is acceptable.  The worst case is that we
-      // temporarily hold two offers for the same host, which should be corrected when we return
-      // them after the return delay.
-      // There's also a chance that we return an offer for compaction ~simultaneously with the
-      // same-host offer being cancelled/returned.  This is also fine.
-      Optional<HostOffer> sameSlave = hostOffers.get(offer.getOffer().getAgentId());
-      if (sameSlave.isPresent()) {
-        // If there are existing offers for the slave, decline all of them so the master can
-        // compact all of those offers into a single offer and send them back.
+    public void addOffer(HostOffer offer) {
+      Optional<HostOffer> sameAgent = hostOffers.addAndPreventAgentCollision(offer);
+      if (sameAgent.isPresent()) {
+        // We have an existing offer for the same agent.  We choose to return both offers so that
+        // they may be combined into a single offer.
         LOG.info("Returning offers for " + offer.getOffer().getAgentId().getValue()
             + " for compaction.");
         decline(offer.getOffer().getId());
-        removeAndDecline(sameSlave.get().getOffer().getId());
+        decline(sameAgent.get().getOffer().getId());
       } else {
-        hostOffers.add(offer);
-        executor.execute(
-            () -> removeAndDecline(offer.getOffer().getId()),
-            offerSettings.getOfferReturnDelay());
+        offerDecline.defer(() -> removeAndDecline(offer.getOffer().getId()));
       }
     }
 
@@ -326,7 +316,27 @@ public interface OfferManager extends EventSubscriber {
         return Optional.of(offer);
       }
 
-      synchronized void add(HostOffer offer) {
+      /**
+       * Adds an offer while maintaining a guarantee that no two offers may exist with the same
+       * agent ID.  If an offer exists with the same agent ID, the existing offer is removed
+       * and returned, and {@code offer} is not added.
+       *
+       * @param offer Offer to add.
+       * @return The pre-existing offer with the same agent ID as {@code offer}, if one exists,
+       *         which will also be removed prior to returning.
+       */
+      synchronized Optional<HostOffer> addAndPreventAgentCollision(HostOffer offer) {
+        HostOffer sameAgent = offersBySlave.get(offer.getOffer().getAgentId());
+        if (sameAgent != null) {
+          remove(sameAgent.getOffer().getId());
+          return Optional.of(sameAgent);
+        }
+
+        addInternal(offer);
+        return Optional.absent();
+      }
+
+      private void addInternal(HostOffer offer) {
         offers.add(offer);
         offersById.put(offer.getOffer().getId(), offer);
         offersBySlave.put(offer.getOffer().getAgentId(), offer);
@@ -350,7 +360,7 @@ public interface OfferManager extends EventSubscriber {
         if (offer != null) {
           // Remove and re-add a host's offer to re-sort based on its new hostStatus
           remove(offer.getOffer().getId());
-          add(new HostOffer(offer.getOffer(), attributes));
+          addInternal(new HostOffer(offer.getOffer(), attributes));
         }
       }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
index 4c6fd54..e060f20 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferSettings.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.offers;
 
 import java.util.List;
 
-import com.google.common.base.Supplier;
 import com.google.common.collect.Ordering;
 
 import org.apache.aurora.common.quantity.Amount;
@@ -30,24 +29,14 @@ import static java.util.Objects.requireNonNull;
 public class OfferSettings {
 
   private final Amount<Long, Time> offerFilterDuration;
-  private final Supplier<Amount<Long, Time>> returnDelaySupplier;
   private final Ordering<HostOffer> offerOrder;
 
-  public OfferSettings(
-      Amount<Long, Time> offerFilterDuration,
-      Supplier<Amount<Long, Time>> returnDelaySupplier,
-      List<OfferOrder> offerOrder) {
-
-    this(offerFilterDuration, returnDelaySupplier, OfferOrderBuilder.create(offerOrder));
+  public OfferSettings(Amount<Long, Time> offerFilterDuration, List<OfferOrder> offerOrder) {
+    this(offerFilterDuration, OfferOrderBuilder.create(offerOrder));
   }
 
-  OfferSettings(
-      Amount<Long, Time> offerFilterDuration,
-      Supplier<Amount<Long, Time>> returnDelaySupplier,
-      Ordering<HostOffer> offerOrder) {
-
+  OfferSettings(Amount<Long, Time> offerFilterDuration, Ordering<HostOffer> offerOrder) {
     this.offerFilterDuration = requireNonNull(offerFilterDuration);
-    this.returnDelaySupplier = requireNonNull(returnDelaySupplier);
     this.offerOrder = requireNonNull(offerOrder);
   }
 
@@ -59,14 +48,6 @@ public class OfferSettings {
   }
 
   /**
-   * The amount of time after which an unused offer should be 'returned' to Mesos by declining it.
-   * The delay is calculated for each offer using a random duration within a fixed window.
-   */
-  public Amount<Long, Time> getOfferReturnDelay() {
-    return returnDelaySupplier.get();
-  }
-
-  /**
    * The ordering to use when fetching offers from OfferManager.
    */
   public Ordering<HostOffer> getOfferOrder() {

http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java b/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
index ab98add..e6b2c55 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java
@@ -22,6 +22,7 @@ import javax.inject.Singleton;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
+import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Ordering;
 import com.google.inject.AbstractModule;
@@ -55,6 +56,12 @@ public class OffersModule extends AbstractModule {
 
   @Parameters(separators = "=")
   public static class Options {
+    @Parameter(names = "-hold_offers_forever",
+        description =
+            "Hold resource offers indefinitely, disabling automatic offer decline settings.",
+        arity = 1)
+    public boolean holdOffersForever = false;
+
     @Parameter(names = "-min_offer_hold_time",
         validateValueWith = NotNegativeAmount.class,
         description = "Minimum amount of time to hold a resource offer before declining.")
@@ -122,15 +129,17 @@ public class OffersModule extends AbstractModule {
   @Override
   protected void configure() {
     Options options = cliOptions.offer;
-    long offerHoldTime =
-        options.offerHoldJitterWindow.as(Time.SECONDS) + options.minOfferHoldTime.as(Time.SECONDS);
-    if (options.unavailabilityThreshold.as(Time.SECONDS) < offerHoldTime) {
-      LOG.warn("unavailability_threshold ({}) is less than the sum of min_offer_hold_time ({}) and"
-          + "offer_hold_jitter_window ({}). This creates risks of races between launching and"
-          + "draining",
-          options.unavailabilityThreshold,
-          options.minOfferHoldTime,
-          options.offerHoldJitterWindow);
+    if (!options.holdOffersForever) {
+      long offerHoldTime = options.offerHoldJitterWindow.as(Time.SECONDS)
+          + options.minOfferHoldTime.as(Time.SECONDS);
+      if (options.unavailabilityThreshold.as(Time.SECONDS) < offerHoldTime) {
+        LOG.warn("unavailability_threshold ({}) is less than the sum of min_offer_hold_time ({})"
+                + " and offer_hold_jitter_window ({}). This creates risks of races between "
+                + "launching and draining",
+            options.unavailabilityThreshold,
+            options.minOfferHoldTime,
+            options.offerHoldJitterWindow);
+      }
     }
 
     for (Module module: MoreModules.instantiateAll(options.offerOrderModules, cliOptions)) {
@@ -144,6 +153,17 @@ public class OffersModule extends AbstractModule {
     install(new PrivateModule() {
       @Override
       protected void configure() {
+        if (options.holdOffersForever) {
+          bind(Deferment.class).to(Deferment.Noop.class);
+        } else {
+          bind(new TypeLiteral<Supplier<Amount<Long, Time>>>() { }).toInstance(
+              new RandomJitterReturnDelay(
+                  options.minOfferHoldTime.as(Time.MILLISECONDS),
+                  options.offerHoldJitterWindow.as(Time.MILLISECONDS),
+                  Random.Util.newDefaultRandom()));
+          bind(Deferment.class).to(Deferment.DelayedDeferment.class);
+        }
+
         bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
         bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
         expose(OfferManager.class);
@@ -155,12 +175,6 @@ public class OffersModule extends AbstractModule {
   @Provides
   @Singleton
   OfferSettings provideOfferSettings(Ordering<HostOffer> offerOrdering) {
-    return new OfferSettings(
-        cliOptions.offer.offerFilterDuration,
-        new RandomJitterReturnDelay(
-            cliOptions.offer.minOfferHoldTime.as(Time.MILLISECONDS),
-            cliOptions.offer.offerHoldJitterWindow.as(Time.MILLISECONDS),
-            Random.Util.newDefaultRandom()),
-        offerOrdering);
+    return new OfferSettings(cliOptions.offer.offerFilterDuration, offerOrdering);
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
index 8f4f63c..5b50244 100644
--- a/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/config/CommandLineTest.java
@@ -117,6 +117,7 @@ public class CommandLineTest {
     expected.reconciliation.reconciliationScheduleSpread = TEST_TIME;
     expected.reconciliation.reconciliationBatchSize = 42;
     expected.reconciliation.reconciliationBatchInterval = TEST_TIME;
+    expected.offer.holdOffersForever = true;
     expected.offer.minOfferHoldTime = TEST_TIME;
     expected.offer.offerHoldJitterWindow = TEST_TIME;
     expected.offer.offerFilterDuration = TEST_TIME;
@@ -269,6 +270,7 @@ public class CommandLineTest {
         "-reconciliation_schedule_spread=42days",
         "-reconciliation_explicit_batch_size=42",
         "-reconciliation_explicit_batch_interval=42days",
+        "-hold_offers_forever=true",
         "-min_offer_hold_time=42days",
         "-offer_hold_jitter_window=42days",
         "-offer_filter_duration=42days",

http://git-wip-us.apache.org/repos/asf/aurora/blob/a9353896/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
index 815a7e8..6c8434e 100644
--- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
@@ -32,6 +32,7 @@ import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged;
 import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.offers.Deferment.Noop;
 import org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
@@ -101,42 +102,31 @@ public class OfferManagerImplTest extends EasyMockTest {
       .setLaunch(Operation.Launch.newBuilder().addTaskInfos(TASK_INFO))
       .build();
   private static final List<Operation> OPERATIONS = ImmutableList.of(launch);
-  private static final long OFFER_FILTER_SECONDS = 0L;
+  private static final long OFFER_FILTER_SECONDS = 0;
   private static final Filters OFFER_FILTER = Filters.newBuilder()
       .setRefuseSeconds(OFFER_FILTER_SECONDS)
       .build();
 
   private Driver driver;
-  private FakeScheduledExecutor clock;
   private OfferManagerImpl offerManager;
   private FakeStatsProvider statsProvider;
 
   @Before
   public void setUp() {
     driver = createMock(Driver.class);
-    DelayExecutor executorMock = createMock(DelayExecutor.class);
-    clock = FakeScheduledExecutor.fromDelayExecutor(executorMock);
-    addTearDown(clock::assertEmpty);
     OfferSettings offerSettings = new OfferSettings(
         Amount.of(OFFER_FILTER_SECONDS, Time.SECONDS),
-        () -> RETURN_DELAY,
         ImmutableList.of(OfferOrder.RANDOM));
     statsProvider = new FakeStatsProvider();
-    offerManager = new OfferManagerImpl(driver, offerSettings, statsProvider, executorMock);
+    offerManager = new OfferManagerImpl(driver, offerSettings, statsProvider, new Noop());
   }
 
   @Test
-  public void testOffersSortedByUnavailability() throws Exception {
-    clock.advance(Amount.of(1L, Time.HOURS));
-
-    HostOffer hostOfferB = setUnavailability(OFFER_B, clock.nowMillis());
-    Long offerCStartTime = clock.nowMillis() + ONE_HOUR.as(Time.MILLISECONDS);
+  public void testOffersSortedByUnavailability() {
+    HostOffer hostOfferB = setUnavailability(OFFER_B, 1);
+    long offerCStartTime = ONE_HOUR.as(Time.MILLISECONDS);
     HostOffer hostOfferC = setUnavailability(OFFER_C, offerCStartTime);
 
-    driver.declineOffer(OFFER_B.getOffer().getId(), OFFER_FILTER);
-    driver.declineOffer(OFFER_A.getOffer().getId(), OFFER_FILTER);
-    driver.declineOffer(OFFER_C.getOffer().getId(), OFFER_FILTER);
-
     control.replay();
 
     offerManager.addOffer(hostOfferB);
@@ -149,8 +139,6 @@ public class OfferManagerImplTest extends EasyMockTest {
         // hostOfferC has a further away start time, so it should be preferred.
         ImmutableList.of(OFFER_A, hostOfferC, hostOfferB),
         actual);
-
-    clock.advance(RETURN_DELAY);
   }
 
   @Test
@@ -163,35 +151,23 @@ public class OfferManagerImplTest extends EasyMockTest {
     driver.acceptOffers(OFFER_B.getOffer().getId(), OPERATIONS, OFFER_FILTER);
     expectLastCall();
 
-    driver.declineOffer(OFFER_A_ID, OFFER_FILTER);
-    expectLastCall();
-    driver.declineOffer(offerC.getOffer().getId(), OFFER_FILTER);
-    expectLastCall();
-
     control.replay();
 
     offerManager.addOffer(offerA);
-    assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+    assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
     offerManager.addOffer(OFFER_B);
-    assertEquals(2L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+    assertEquals(2, statsProvider.getLongValue(OUTSTANDING_OFFERS));
     offerManager.addOffer(offerC);
-    assertEquals(3L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+    assertEquals(3, statsProvider.getLongValue(OUTSTANDING_OFFERS));
     assertEquals(
         ImmutableSet.of(OFFER_B, offerA, offerC),
         ImmutableSet.copyOf(offerManager.getOffers()));
     offerManager.launchTask(OFFER_B.getOffer().getId(), TASK_INFO);
-    assertEquals(2L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
-    clock.advance(RETURN_DELAY);
-    assertEquals(0L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+    assertEquals(2, statsProvider.getLongValue(OUTSTANDING_OFFERS));
   }
 
   @Test
-  public void hostAttributeChangeUpdatesOfferSorting() throws Exception {
-    driver.declineOffer(OFFER_A_ID, OFFER_FILTER);
-    expectLastCall();
-    driver.declineOffer(OFFER_B.getOffer().getId(), OFFER_FILTER);
-    expectLastCall();
-
+  public void hostAttributeChangeUpdatesOfferSorting() {
     control.replay();
 
     offerManager.hostAttributesChanged(new HostAttributesChanged(HOST_ATTRIBUTES_A));
@@ -209,8 +185,6 @@ public class OfferManagerImplTest extends EasyMockTest {
     offerManager.hostAttributesChanged(new HostAttributesChanged(offerA.getAttributes()));
     offerManager.hostAttributesChanged(new HostAttributesChanged(offerB.getAttributes()));
     assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getOffers()));
-
-    clock.advance(RETURN_DELAY);
   }
 
   @Test
@@ -221,108 +195,84 @@ public class OfferManagerImplTest extends EasyMockTest {
     control.replay();
 
     offerManager.addOffer(OFFER_A);
-    assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+    assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
     offerManager.addOffer(OFFER_A);
-    assertEquals(0L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
-
-    clock.advance(RETURN_DELAY);
+    assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS));
   }
 
   @Test
-  public void testGetOffersReturnsAllOffers() throws Exception {
+  public void testGetOffersReturnsAllOffers() {
     control.replay();
 
     offerManager.addOffer(OFFER_A);
     assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
-    assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+    assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
 
     offerManager.cancelOffer(OFFER_A_ID);
-    assertEquals(0L, statsProvider.getLongValue(OFFER_CANCEL_FAILURES));
+    assertEquals(0, statsProvider.getLongValue(OFFER_CANCEL_FAILURES));
     assertTrue(Iterables.isEmpty(offerManager.getOffers()));
-    assertEquals(0L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
-
-    clock.advance(RETURN_DELAY);
+    assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS));
   }
 
   @Test
-  public void testOfferFilteringDueToStaticBan() throws Exception {
-    driver.declineOffer(OFFER_A_ID, OFFER_FILTER);
-    expectLastCall();
-
+  public void testOfferFilteringDueToStaticBan() {
     control.replay();
 
     // Static ban ignored when now offers.
     offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
-    assertEquals(0L, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+    assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     offerManager.addOffer(OFFER_A);
     assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
     assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
 
     // Add static ban.
     offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
-    assertEquals(1L, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+    assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
     assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
-
-    clock.advance(RETURN_DELAY);
-    assertEquals(0L, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
   }
 
   @Test
-  public void testStaticBanIsClearedOnOfferReturn() throws Exception {
-    driver.declineOffer(OFFER_A_ID, OFFER_FILTER);
-    expectLastCall().times(2);
-
+  public void testStaticBanIsClearedOnOfferReturn() {
     control.replay();
 
     offerManager.addOffer(OFFER_A);
     offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
     assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
     assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
-    assertEquals(1L, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+    assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
 
     // Make sure the static ban is cleared when the offers are returned.
-    clock.advance(RETURN_DELAY);
+    offerManager.cancelOffer(OFFER_A_ID);
     offerManager.addOffer(OFFER_A);
     assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
-    assertEquals(0L, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
-
-    clock.advance(RETURN_DELAY);
+    assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
   }
 
   @Test
-  public void testStaticBanIsClearedOnDriverDisconnect() throws Exception {
-    driver.declineOffer(OFFER_A_ID, OFFER_FILTER);
-    expectLastCall();
-
+  public void testStaticBanIsClearedOnDriverDisconnect() {
     control.replay();
 
     offerManager.addOffer(OFFER_A);
     offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
     assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
     assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
-    assertEquals(1L, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+    assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
 
     // Make sure the static ban is cleared when driver is disconnected.
     offerManager.driverDisconnected(new DriverDisconnected());
-    assertEquals(0L, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+    assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
     offerManager.addOffer(OFFER_A);
     assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
-
-    clock.advance(RETURN_DELAY);
   }
 
   @Test
-  public void getOffer() {
-    driver.declineOffer(OFFER_A_ID, OFFER_FILTER);
-    expectLastCall();
-
+  public void testGetOffer() {
     control.replay();
 
     offerManager.addOffer(OFFER_A);
     assertEquals(Optional.of(OFFER_A), offerManager.getOffer(OFFER_A.getOffer().getAgentId()));
-    assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
-    clock.advance(RETURN_DELAY);
+    assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
   }
 
   @Test(expected = OfferManager.LaunchException.class)
@@ -333,12 +283,7 @@ public class OfferManagerImplTest extends EasyMockTest {
     control.replay();
 
     offerManager.addOffer(OFFER_A);
-
-    try {
-      offerManager.launchTask(OFFER_A_ID, TASK_INFO);
-    } finally {
-      clock.advance(RETURN_DELAY);
-    }
+    offerManager.launchTask(OFFER_A_ID, TASK_INFO);
   }
 
   @Test
@@ -348,66 +293,48 @@ public class OfferManagerImplTest extends EasyMockTest {
       offerManager.launchTask(OFFER_A_ID, TASK_INFO);
       fail("Method invocation is expected to throw exception.");
     } catch (OfferManager.LaunchException e) {
-      assertEquals(1L, statsProvider.getLongValue(OFFER_ACCEPT_RACES));
+      assertEquals(1, statsProvider.getLongValue(OFFER_ACCEPT_RACES));
     }
   }
 
   @Test
-  public void testFlushOffers() throws Exception {
+  public void testFlushOffers() {
     control.replay();
 
     offerManager.addOffer(OFFER_A);
     offerManager.addOffer(OFFER_B);
-    assertEquals(2L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+    assertEquals(2, statsProvider.getLongValue(OUTSTANDING_OFFERS));
     offerManager.driverDisconnected(new DriverDisconnected());
-    assertEquals(0L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
-    clock.advance(RETURN_DELAY);
+    assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS));
   }
 
   @Test
-  public void testCancelFailure() throws Exception {
+  public void testCancelFailure() {
     control.replay();
 
     offerManager.cancelOffer(OFFER_A.getOffer().getId());
-    assertEquals(1L, statsProvider.getLongValue(OFFER_CANCEL_FAILURES));
-  }
-
-  @Test
-  public void testDeclineOffer() throws Exception {
-    driver.declineOffer(OFFER_A.getOffer().getId(), OFFER_FILTER);
-    expectLastCall();
-
-    control.replay();
-
-    offerManager.addOffer(OFFER_A);
-    assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
-    clock.advance(RETURN_DELAY);
-    assertEquals(0L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+    assertEquals(1, statsProvider.getLongValue(OFFER_CANCEL_FAILURES));
   }
 
   @Test
-  public void testBanAndUnbanOffer() throws Exception {
-    driver.declineOffer(OFFER_A.getOffer().getId(), OFFER_FILTER);
-    expectLastCall().times(2);
+  public void testBanAndUnbanOffer() {
     control.replay();
 
     // After adding a banned offer, user can see it is in OUTSTANDING_OFFERS but cannot retrieve it.
     offerManager.banOffer(OFFER_A_ID);
     offerManager.addOffer(OFFER_A);
-    assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
-    assertEquals(1L, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
+    assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+    assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
     assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
-    clock.advance(RETURN_DELAY);
 
     offerManager.cancelOffer(OFFER_A_ID);
     offerManager.addOffer(OFFER_A);
-    assertEquals(1L, statsProvider.getLongValue(OUTSTANDING_OFFERS));
-    assertEquals(0L, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
+    assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+    assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
     assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
-    clock.advance(RETURN_DELAY);
   }
 
-  private static HostOffer setUnavailability(HostOffer offer, Long startMs) {
+  private static HostOffer setUnavailability(HostOffer offer, long startMs) {
     Unavailability unavailability = Unavailability.newBuilder()
         .setStart(TimeInfo.newBuilder().setNanoseconds(startMs * 1000L)).build();
     return new HostOffer(
@@ -422,17 +349,13 @@ public class OfferManagerImplTest extends EasyMockTest {
   }
 
   private OfferManager createOrderedManager(List<OfferOrder> order) {
-    OfferSettings settings = new OfferSettings(
-        Amount.of(OFFER_FILTER_SECONDS, Time.SECONDS),
-        () -> RETURN_DELAY,
-        order);
-    DelayExecutor executorMock = createMock(DelayExecutor.class);
-    clock = FakeScheduledExecutor.fromDelayExecutor(executorMock);
-    return new OfferManagerImpl(driver, settings, statsProvider, executorMock);
+    OfferSettings settings =
+        new OfferSettings(Amount.of(OFFER_FILTER_SECONDS, Time.SECONDS), order);
+    return new OfferManagerImpl(driver, settings, statsProvider, new Noop());
   }
 
   @Test
-  public void testCPUOrdering() throws Exception {
+  public void testCPUOrdering() {
     OfferManager cpuManager = createOrderedManager(ImmutableList.of(OfferOrder.CPU));
 
     HostOffer small = setMode(new HostOffer(
@@ -464,7 +387,7 @@ public class OfferManagerImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testRevocableCPUOrdering() throws Exception {
+  public void testRevocableCPUOrdering() {
     ResourceType.initializeEmptyCliArgsForTest();
     OfferManager cpuManager = createOrderedManager(ImmutableList.of(OfferOrder.REVOCABLE_CPU));
 
@@ -501,7 +424,7 @@ public class OfferManagerImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testDiskOrdering() throws Exception {
+  public void testDiskOrdering() {
     OfferManager cpuManager = createOrderedManager(ImmutableList.of(OfferOrder.DISK));
 
     HostOffer small = setMode(new HostOffer(
@@ -529,7 +452,7 @@ public class OfferManagerImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testMemoryOrdering() throws Exception {
+  public void testMemoryOrdering() {
     OfferManager cpuManager = createOrderedManager(ImmutableList.of(OfferOrder.MEMORY));
 
     HostOffer small = setMode(new HostOffer(
@@ -557,7 +480,7 @@ public class OfferManagerImplTest extends EasyMockTest {
   }
 
   @Test
-  public void testCPUMemoryOrdering() throws Exception {
+  public void testCPUMemoryOrdering() {
     OfferManager cpuManager = createOrderedManager(
         ImmutableList.of(OfferOrder.CPU, OfferOrder.MEMORY));
 
@@ -594,4 +517,32 @@ public class OfferManagerImplTest extends EasyMockTest {
     assertEquals(ImmutableList.of(small, medium, large),
         ImmutableList.copyOf(cpuManager.getOffers()));
   }
+
+  @Test
+  public void testDelayedOfferReturn() {
+    OfferSettings settings = new OfferSettings(
+        Amount.of(OFFER_FILTER_SECONDS, Time.SECONDS),
+        ImmutableList.of(OfferOrder.RANDOM));
+    DelayExecutor executorMock = createMock(DelayExecutor.class);
+    FakeScheduledExecutor clock = FakeScheduledExecutor.fromDelayExecutor(executorMock);
+    addTearDown(clock::assertEmpty);
+    offerManager = new OfferManagerImpl(
+        driver,
+        settings,
+        statsProvider,
+        new Deferment.DelayedDeferment(() -> RETURN_DELAY, executorMock));
+
+    driver.declineOffer(OFFER_A_ID, OFFER_FILTER);
+
+    control.replay();
+
+    offerManager.addOffer(OFFER_A);
+    offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
+    assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+    assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+
+    clock.advance(RETURN_DELAY);
+    assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS));
+    assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+  }
 }