You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2015/08/05 21:40:13 UTC
aurora git commit: Integrating DelayExecutor into the scheduler's
transaction handling.
Repository: aurora
Updated Branches:
refs/heads/master ae6e8575b -> 61c63ea9e
Integrating DelayExecutor into the scheduler's transaction handling.
Bugs closed: AURORA-1395
Reviewed at https://reviews.apache.org/r/37049/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/61c63ea9
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/61c63ea9
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/61c63ea9
Branch: refs/heads/master
Commit: 61c63ea9e79675bd415fbae0cd5d16f51ebd63f2
Parents: ae6e857
Author: Bill Farner <wf...@apache.org>
Authored: Wed Aug 5 12:39:52 2015 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Wed Aug 5 12:39:52 2015 -0700
----------------------------------------------------------------------
.../aurora/benchmark/SchedulingBenchmarks.java | 40 ++++-------
.../aurora/benchmark/ThriftApiBenchmarks.java | 2 +
.../aurora/scheduler/async/AsyncModule.java | 76 ++++++++++++--------
.../scheduler/async/GatedDelayExecutor.java | 7 ++
.../aurora/scheduler/offers/OfferManager.java | 12 ++--
.../scheduler/pruning/TaskHistoryPruner.java | 14 ++--
.../scheduler/reconciliation/KillRetry.java | 11 +--
.../reconciliation/ReconciliationModule.java | 20 ++++++
.../reconciliation/TaskReconciler.java | 4 +-
.../scheduler/reconciliation/TaskTimeout.java | 18 +++--
.../aurora/scheduler/scheduling/TaskGroups.java | 64 +++--------------
.../scheduler/scheduling/TaskThrottler.java | 15 ++--
.../aurora/scheduler/storage/db/DbModule.java | 29 +++++---
.../aurora/scheduler/storage/db/DbStorage.java | 23 +++++-
.../aurora/scheduler/storage/db/DbUtil.java | 2 +
.../aurora/scheduler/async/AsyncModuleTest.java | 6 +-
.../scheduler/http/JettyServerModuleTest.java | 5 ++
.../http/api/security/HttpSecurityIT.java | 7 --
.../scheduler/offers/OfferManagerImplTest.java | 6 +-
.../pruning/TaskHistoryPrunerTest.java | 62 +++++++++-------
.../scheduler/reconciliation/KillRetryTest.java | 17 ++---
.../reconciliation/TaskTimeoutTest.java | 16 ++---
.../scheduler/scheduling/TaskGroupsTest.java | 12 ++--
.../scheduler/scheduling/TaskThrottlerTest.java | 14 ++--
.../scheduler/storage/db/DbStorageTest.java | 6 ++
.../testing/FakeScheduledExecutor.java | 35 ++++++++-
26 files changed, 286 insertions(+), 237 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index d75f090..e41b299 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -14,14 +14,11 @@
package org.apache.aurora.benchmark;
import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.inject.Singleton;
import com.google.common.eventbus.EventBus;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
@@ -39,6 +36,7 @@ import org.apache.aurora.benchmark.fakes.FakeStatsProvider;
import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.TaskIdGenerator;
import org.apache.aurora.scheduler.async.AsyncModule;
+import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.events.PubsubEvent;
import org.apache.aurora.scheduler.filter.SchedulingFilter;
@@ -90,7 +88,6 @@ public class SchedulingBenchmarks {
private static final Amount<Long, Time> DELAY_FOREVER = Amount.of(30L, Time.DAYS);
protected Storage storage;
protected PendingTaskProcessor pendingTaskProcessor;
- protected ScheduledThreadPoolExecutor executor;
private TaskScheduler taskScheduler;
private OfferManager offerManager;
private EventBus eventBus;
@@ -105,11 +102,6 @@ public class SchedulingBenchmarks {
eventBus = new EventBus();
final FakeClock clock = new FakeClock();
clock.setNowMillis(System.currentTimeMillis());
- executor = new ScheduledThreadPoolExecutor(
- 1,
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("TestProcessor-%d").build());
// TODO(maxim): Find a way to DRY it and reuse existing modules instead.
Injector injector = Guice.createInjector(
@@ -118,18 +110,23 @@ public class SchedulingBenchmarks {
new PrivateModule() {
@Override
protected void configure() {
- bind(ScheduledExecutorService.class)
- .annotatedWith(AsyncModule.AsyncExecutor.class)
- .toInstance(executor);
- bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
- bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
- bind(OfferManager.OfferReturnDelay.class).toInstance(
- new OfferManager.OfferReturnDelay() {
+ // We use a no-op executor for async work, as this benchmark is focused on the
+ // synchronous scheduling operations.
+ bind(DelayExecutor.class).annotatedWith(AsyncModule.AsyncExecutor.class)
+ .toInstance(new DelayExecutor() {
+ @Override
+ public void execute(Runnable work, Amount<Long, Time> minDelay) {
+ // No-op.
+ }
+
@Override
- public Amount<Long, Time> get() {
- return DELAY_FOREVER;
+ public void execute(Runnable command) {
+ // No-op.
}
});
+ bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class);
+ bind(OfferManager.OfferManagerImpl.class).in(Singleton.class);
+ bind(OfferManager.OfferReturnDelay.class).toInstance(() -> DELAY_FOREVER);
bind(BiCache.BiCacheSettings.class).toInstance(
new BiCache.BiCacheSettings(DELAY_FOREVER, ""));
bind(TaskScheduler.class).to(TaskScheduler.TaskSchedulerImpl.class);
@@ -183,13 +180,6 @@ public class SchedulingBenchmarks {
saveTasks(settings.getTasks());
}
- @Setup(Level.Iteration)
- public void setUpIteration() {
- // Clear executor queue between iterations. Otherwise, executor tasks keep piling up and
- // affect benchmark performance due to memory pressure and excessive GC.
- executor.getQueue().clear();
- }
-
private Set<IScheduledTask> buildClusterTasks(int numOffers) {
int numOffersToFill = (int) Math.round(numOffers * settings.getClusterUtilization());
return new Tasks.Builder()
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
index b2a3e9b..4ddfea2 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
@@ -32,6 +32,7 @@ import org.apache.aurora.gen.ReadOnlyScheduler;
import org.apache.aurora.gen.Response;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.TaskQuery;
+import org.apache.aurora.scheduler.async.AsyncModule;
import org.apache.aurora.scheduler.cron.CronPredictor;
import org.apache.aurora.scheduler.quota.QuotaManager;
import org.apache.aurora.scheduler.state.LockManager;
@@ -151,6 +152,7 @@ public class ThriftApiBenchmarks {
bind(StatsProvider.class).toInstance(new FakeStatsProvider());
}
},
+ new AsyncModule(),
DbModule.productionModule(Bindings.KeyFactory.PLAIN),
new ThriftModule.ReadOnly());
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index c345c92..8416ea0 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -15,18 +15,18 @@ package org.apache.aurora.scheduler.async;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.logging.Logger;
import javax.inject.Inject;
import javax.inject.Qualifier;
+import javax.inject.Singleton;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Supplier;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.AbstractModule;
+import com.google.inject.PrivateModule;
import com.twitter.common.args.Arg;
import com.twitter.common.args.CmdLine;
import com.twitter.common.stats.StatsProvider;
@@ -48,61 +48,77 @@ public class AsyncModule extends AbstractModule {
@CmdLine(name = "async_worker_threads",
help = "The number of worker threads to process async task operations with.")
- private static final Arg<Integer> ASYNC_WORKER_THREADS = Arg.create(1);
+ private static final Arg<Integer> ASYNC_WORKER_THREADS = Arg.create(8);
+ private final ScheduledThreadPoolExecutor afterTransaction;
@Qualifier
@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
public @interface AsyncExecutor { }
- @VisibleForTesting
- static final String TIMEOUT_QUEUE_GAUGE = "timeout_queue_size";
+ public AsyncModule() {
+ // Don't worry about clean shutdown, these can be daemon and cleanup-free.
+ // TODO(wfarner): Should we use a bounded caching thread pool executor instead?
+ this(AsyncUtil.loggingScheduledExecutor(ASYNC_WORKER_THREADS.get(), "AsyncProcessor-%d", LOG));
+ }
@VisibleForTesting
- static final String ASYNC_TASKS_GAUGE = "async_tasks_completed";
+ public AsyncModule(ScheduledThreadPoolExecutor executor) {
+ this.afterTransaction = requireNonNull(executor);
+ }
@Override
protected void configure() {
- // Don't worry about clean shutdown, these can be daemon and cleanup-free.
- final ScheduledThreadPoolExecutor executor =
- AsyncUtil.loggingScheduledExecutor(ASYNC_WORKER_THREADS.get(), "AsyncProcessor-%d", LOG);
- bind(ScheduledThreadPoolExecutor.class).annotatedWith(AsyncExecutor.class).toInstance(executor);
- bind(ScheduledExecutorService.class).annotatedWith(AsyncExecutor.class).toInstance(executor);
- bind(ExecutorService.class).annotatedWith(AsyncExecutor.class).toInstance(executor);
+ install(new PrivateModule() {
+ @Override
+ protected void configure() {
+ bind(ScheduledThreadPoolExecutor.class).toInstance(afterTransaction);
+ bind(ScheduledExecutorService.class).toInstance(afterTransaction);
+
+ bind(GatedDelayExecutor.class).in(Singleton.class);
+ expose(GatedDelayExecutor.class);
+
+ bind(RegisterGauges.class).in(Singleton.class);
+ expose(RegisterGauges.class);
+ }
+ });
SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(RegisterGauges.class);
+
+ bind(DelayExecutor.class).annotatedWith(AsyncExecutor.class).to(GatedDelayExecutor.class);
+ bind(FlushableWorkQueue.class).annotatedWith(AsyncExecutor.class).to(GatedDelayExecutor.class);
}
static class RegisterGauges extends AbstractIdleService {
+ @VisibleForTesting
+ static final String TIMEOUT_QUEUE_GAUGE = "timeout_queue_size";
+
+ @VisibleForTesting
+ static final String ASYNC_TASKS_GAUGE = "async_tasks_completed";
+
+ @VisibleForTesting
+ static final String DELAY_QUEUE_GAUGE = "delay_executor_queue_size";
+
private final StatsProvider statsProvider;
private final ScheduledThreadPoolExecutor executor;
+ private final GatedDelayExecutor delayExecutor;
@Inject
RegisterGauges(
StatsProvider statsProvider,
- @AsyncExecutor ScheduledThreadPoolExecutor executor) {
+ ScheduledThreadPoolExecutor executor,
+ GatedDelayExecutor delayExecutor) {
this.statsProvider = requireNonNull(statsProvider);
this.executor = requireNonNull(executor);
+ this.delayExecutor = requireNonNull(delayExecutor);
}
@Override
protected void startUp() {
- statsProvider.makeGauge(
- TIMEOUT_QUEUE_GAUGE,
- new Supplier<Integer>() {
- @Override
- public Integer get() {
- return executor.getQueue().size();
- }
- });
- statsProvider.makeGauge(
- ASYNC_TASKS_GAUGE,
- new Supplier<Long>() {
- @Override
- public Long get() {
- return executor.getCompletedTaskCount();
- }
- }
- );
+ statsProvider.makeGauge(TIMEOUT_QUEUE_GAUGE, () -> executor.getQueue().size());
+ statsProvider.makeGauge(ASYNC_TASKS_GAUGE, executor::getCompletedTaskCount);
+ // Using a lambda rather than method ref to sidestep a bug in PMD that makes it think
+ // delayExecutor is unused.
+ statsProvider.makeGauge(DELAY_QUEUE_GAUGE, () -> delayExecutor.getQueueSize());
}
@Override
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java b/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java
index 1893a9b..2889e79 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java
@@ -16,6 +16,8 @@ package org.apache.aurora.scheduler.async;
import java.util.Queue;
import java.util.concurrent.ScheduledExecutorService;
+import javax.inject.Inject;
+
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.twitter.common.quantity.Amount;
@@ -36,10 +38,15 @@ class GatedDelayExecutor implements DelayExecutor, FlushableWorkQueue {
*
* @param delegate Delegate to execute work with when flushed.
*/
+ @Inject
GatedDelayExecutor(ScheduledExecutorService delegate) {
this.executor = requireNonNull(delegate);
}
+ synchronized int getQueueSize() {
+ return queue.size();
+ }
+
private synchronized void enqueue(Runnable work) {
queue.add(work);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
index 4b8a55f..d1ebad1 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
@@ -17,8 +17,6 @@ import java.util.Comparator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
@@ -43,6 +41,7 @@ import com.twitter.common.stats.Stats;
import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.scheduler.HostOffer;
import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
@@ -159,14 +158,14 @@ public interface OfferManager extends EventSubscriber {
private final Driver driver;
private final OfferReturnDelay returnDelay;
- private final ScheduledExecutorService executor;
+ private final DelayExecutor executor;
@Inject
@VisibleForTesting
public OfferManagerImpl(
Driver driver,
OfferReturnDelay returnDelay,
- @AsyncExecutor ScheduledExecutorService executor) {
+ @AsyncExecutor DelayExecutor executor) {
this.driver = requireNonNull(driver);
this.returnDelay = requireNonNull(returnDelay);
@@ -190,15 +189,14 @@ public interface OfferManager extends EventSubscriber {
removeAndDecline(sameSlave.get().getOffer().getId());
} else {
hostOffers.add(offer);
- executor.schedule(
+ executor.execute(
new Runnable() {
@Override
public void run() {
removeAndDecline(offer.getOffer().getId());
}
},
- returnDelay.get().as(Time.MILLISECONDS),
- TimeUnit.MILLISECONDS);
+ returnDelay.get());
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
index fa9a09c..3cff5bb 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
@@ -14,8 +14,6 @@
package org.apache.aurora.scheduler.pruning;
import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import javax.inject.Inject;
@@ -32,6 +30,7 @@ import com.twitter.common.util.Clock;
import org.apache.aurora.gen.apiConstants;
import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.state.StateManager;
@@ -51,7 +50,7 @@ import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
public class TaskHistoryPruner implements EventSubscriber {
private static final Logger LOG = Logger.getLogger(TaskHistoryPruner.class.getName());
- private final ScheduledExecutorService executor;
+ private final DelayExecutor executor;
private final StateManager stateManager;
private final Clock clock;
private final HistoryPrunnerSettings settings;
@@ -83,7 +82,7 @@ public class TaskHistoryPruner implements EventSubscriber {
@Inject
TaskHistoryPruner(
- @AsyncExecutor ScheduledExecutorService executor,
+ @AsyncExecutor DelayExecutor executor,
StateManager stateManager,
Clock clock,
HistoryPrunnerSettings settings,
@@ -142,7 +141,7 @@ public class TaskHistoryPruner implements EventSubscriber {
long timeRemaining) {
LOG.fine("Prune task " + taskId + " in " + timeRemaining + " ms.");
- executor.schedule(
+ executor.execute(
new Runnable() {
@Override
public void run() {
@@ -150,10 +149,9 @@ public class TaskHistoryPruner implements EventSubscriber {
deleteTasks(ImmutableSet.of(taskId));
}
},
- timeRemaining,
- TimeUnit.MILLISECONDS);
+ Amount.of(timeRemaining, Time.MILLISECONDS));
- executor.submit(new Runnable() {
+ executor.execute(new Runnable() {
@Override
public void run() {
Iterable<IScheduledTask> inactiveTasks =
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java
index 1611a3b..b422fa1 100644
--- a/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java
@@ -13,8 +13,6 @@
*/
package org.apache.aurora.scheduler.reconciliation;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
@@ -23,11 +21,14 @@ import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.eventbus.Subscribe;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
import com.twitter.common.stats.StatsProvider;
import com.twitter.common.util.BackoffStrategy;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -48,7 +49,7 @@ public class KillRetry implements EventSubscriber {
private final Driver driver;
private final Storage storage;
- private final ScheduledExecutorService executor;
+ private final DelayExecutor executor;
private final BackoffStrategy backoffStrategy;
private final AtomicLong killRetries;
@@ -56,7 +57,7 @@ public class KillRetry implements EventSubscriber {
KillRetry(
Driver driver,
Storage storage,
- @AsyncExecutor ScheduledExecutorService executor,
+ @AsyncExecutor DelayExecutor executor,
BackoffStrategy backoffStrategy,
StatsProvider statsProvider) {
@@ -84,7 +85,7 @@ public class KillRetry implements EventSubscriber {
void tryLater() {
retryInMs.set(backoffStrategy.calculateBackoffMs(retryInMs.get()));
- executor.schedule(this, retryInMs.get(), TimeUnit.MILLISECONDS);
+ executor.execute(this, Amount.of(retryInMs.get(), Time.MILLISECONDS));
}
@Override
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java
index 406c077..2677238 100644
--- a/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/ReconciliationModule.java
@@ -13,6 +13,12 @@
*/
package org.apache.aurora.scheduler.reconciliation;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.logging.Logger;
+
+import javax.inject.Qualifier;
import javax.inject.Singleton;
import com.google.inject.AbstractModule;
@@ -27,14 +33,22 @@ import com.twitter.common.util.BackoffStrategy;
import com.twitter.common.util.TruncatedBinaryBackoff;
import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.base.AsyncUtil;
import org.apache.aurora.scheduler.events.PubsubEventModule;
import org.apache.aurora.scheduler.reconciliation.TaskReconciler.TaskReconcilerSettings;
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
/**
* Binding module for state reconciliation and retry logic.
*/
public class ReconciliationModule extends AbstractModule {
+ private static final Logger LOG = Logger.getLogger(ReconciliationModule.class.getName());
+
@CmdLine(name = "transient_task_state_timeout",
help = "The amount of time after which to treat a task stuck in a transient state as LOST.")
private static final Arg<Amount<Long, Time>> TRANSIENT_TASK_STATE_TIMEOUT =
@@ -73,6 +87,10 @@ public class ReconciliationModule extends AbstractModule {
private static final Arg<Amount<Long, Time>> RECONCILIATION_SCHEDULE_SPREAD =
Arg.create(Amount.of(30L, Time.MINUTES));
+ @Qualifier
+ @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+ @interface BackgroundWorker { }
+
@Override
protected void configure() {
install(new PrivateModule() {
@@ -109,6 +127,8 @@ public class ReconciliationModule extends AbstractModule {
RECONCILIATION_EXPLICIT_INTERVAL.get(),
RECONCILIATION_IMPLICIT_INTERVAL.get(),
RECONCILIATION_SCHEDULE_SPREAD.get()));
+ bind(ScheduledExecutorService.class).annotatedWith(BackgroundWorker.class)
+ .toInstance(AsyncUtil.loggingScheduledExecutor(1, "TaskReconciler-%d", LOG));
bind(TaskReconciler.class).in(Singleton.class);
expose(TaskReconciler.class);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
index 653e52b..8f866a8 100644
--- a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskReconciler.java
@@ -27,10 +27,10 @@ import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.stats.StatsProvider;
-import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.mesos.Driver;
+import org.apache.aurora.scheduler.reconciliation.ReconciliationModule.BackgroundWorker;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.mesos.Protos;
@@ -91,7 +91,7 @@ public class TaskReconciler extends AbstractIdleService {
TaskReconcilerSettings settings,
Storage storage,
Driver driver,
- @AsyncExecutor ScheduledExecutorService executor,
+ @BackgroundWorker ScheduledExecutorService executor,
StatsProvider stats) {
this.settings = requireNonNull(settings);
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
index fb83972..72a46f0 100644
--- a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
+++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.reconciliation;
import java.util.EnumSet;
import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
@@ -31,6 +30,7 @@ import com.twitter.common.stats.StatsProvider;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.state.StateChangeResult;
@@ -65,7 +65,7 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber {
ScheduleStatus.KILLING,
ScheduleStatus.DRAINING);
- private final ScheduledExecutorService executor;
+ private final DelayExecutor executor;
private final Storage storage;
private final StateManager stateManager;
private final Amount<Long, Time> timeout;
@@ -73,7 +73,7 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber {
@Inject
TaskTimeout(
- @AsyncExecutor ScheduledExecutorService executor,
+ @AsyncExecutor DelayExecutor executor,
Storage storage,
StateManager stateManager,
Amount<Long, Time> timeout,
@@ -138,10 +138,9 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber {
// Our service is not yet started. We don't want to lose track of the task, so
// we will try again later.
LOG.fine("Retrying timeout of task " + taskId + " in " + NOT_STARTED_RETRY);
- executor.schedule(
- this,
- NOT_STARTED_RETRY.getValue(),
- NOT_STARTED_RETRY.getUnit().getTimeUnit());
+ // TODO(wfarner): This execution should not wait for a transaction, but a second executor
+ // would be weird.
+ executor.execute(this, NOT_STARTED_RETRY);
}
}
}
@@ -149,10 +148,9 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber {
@Subscribe
public void recordStateChange(TaskStateChange change) {
if (isTransient(change.getNewState())) {
- executor.schedule(
+ executor.execute(
new TimedOutTaskHandler(change.getTaskId(), change.getNewState()),
- timeout.getValue(),
- timeout.getUnit().getTimeUnit());
+ timeout);
}
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
index e60daad..eaf784e 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
@@ -14,13 +14,9 @@
package org.apache.aurora.scheduler.scheduling;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.logging.Logger;
import javax.inject.Inject;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
@@ -28,16 +24,13 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.RateLimiter;
-import com.twitter.common.application.ShutdownRegistry;
-import com.twitter.common.base.Command;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.stats.SlidingStats;
-import com.twitter.common.stats.Stats;
import com.twitter.common.util.BackoffStrategy;
-import com.twitter.common.util.concurrent.ExecutorServiceShutdown;
-import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
@@ -47,7 +40,6 @@ import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
@@ -63,10 +55,8 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING;
*/
public class TaskGroups implements EventSubscriber {
- private static final Logger LOG = Logger.getLogger(TaskGroups.class.getName());
-
private final ConcurrentMap<TaskGroupKey, TaskGroup> groups = Maps.newConcurrentMap();
- private final ScheduledExecutorService executor;
+ private final DelayExecutor executor;
private final TaskScheduler taskScheduler;
private final long firstScheduleDelay;
private final BackoffStrategy backoff;
@@ -95,43 +85,25 @@ public class TaskGroups implements EventSubscriber {
@Inject
TaskGroups(
- ShutdownRegistry shutdownRegistry,
+ @AsyncExecutor DelayExecutor executor,
TaskGroupsSettings settings,
TaskScheduler taskScheduler,
RescheduleCalculator rescheduleCalculator) {
- this(
- createThreadPool(shutdownRegistry),
- settings.firstScheduleDelay,
- settings.taskGroupBackoff,
- settings.rateLimiter,
- taskScheduler,
- rescheduleCalculator);
- }
-
- @VisibleForTesting
- TaskGroups(
- final ScheduledExecutorService executor,
- final Amount<Long, Time> firstScheduleDelay,
- final BackoffStrategy backoff,
- final RateLimiter rateLimiter,
- final TaskScheduler taskScheduler,
- final RescheduleCalculator rescheduleCalculator) {
-
- requireNonNull(firstScheduleDelay);
- Preconditions.checkArgument(firstScheduleDelay.getValue() > 0);
+ requireNonNull(settings.firstScheduleDelay);
+ Preconditions.checkArgument(settings.firstScheduleDelay.getValue() > 0);
this.executor = requireNonNull(executor);
- requireNonNull(rateLimiter);
+ requireNonNull(settings.rateLimiter);
requireNonNull(taskScheduler);
- this.firstScheduleDelay = firstScheduleDelay.as(Time.MILLISECONDS);
- this.backoff = requireNonNull(backoff);
+ this.firstScheduleDelay = settings.firstScheduleDelay.as(Time.MILLISECONDS);
+ this.backoff = requireNonNull(settings.taskGroupBackoff);
this.rescheduleCalculator = requireNonNull(rescheduleCalculator);
this.taskScheduler = new TaskScheduler() {
@Override
public boolean schedule(String taskId) {
- rateLimiter.acquire();
+ settings.rateLimiter.acquire();
return taskScheduler.schedule(taskId);
}
};
@@ -141,7 +113,7 @@ public class TaskGroups implements EventSubscriber {
// Avoid check-then-act by holding the intrinsic lock. If not done atomically, we could
// remove a group while a task is being added to it.
if (group.hasMore()) {
- executor.schedule(evaluate, group.getPenaltyMs(), MILLISECONDS);
+ executor.execute(evaluate, Amount.of(group.getPenaltyMs(), Time.MILLISECONDS));
} else {
groups.remove(group.getKey());
}
@@ -172,20 +144,6 @@ public class TaskGroups implements EventSubscriber {
evaluateGroupLater(monitor, group);
}
- private static ScheduledExecutorService createThreadPool(ShutdownRegistry shutdownRegistry) {
- final ScheduledThreadPoolExecutor executor =
- AsyncUtil.singleThreadLoggingScheduledExecutor("TaskScheduler-%d", LOG);
-
- Stats.exportSize("schedule_queue_size", executor.getQueue());
- shutdownRegistry.addAction(new Command() {
- @Override
- public void execute() {
- new ExecutorServiceShutdown(executor, Amount.of(1L, Time.SECONDS)).execute();
- }
- });
- return executor;
- }
-
/**
* Informs the task groups of a task state change.
* <p>
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
index e54e6c4..fdc5bd7 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
@@ -13,17 +13,17 @@
*/
package org.apache.aurora.scheduler.scheduling;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
import javax.inject.Inject;
import com.google.common.base.Optional;
import com.google.common.eventbus.Subscribe;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
import com.twitter.common.stats.SlidingStats;
import com.twitter.common.util.Clock;
import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -45,7 +45,7 @@ class TaskThrottler implements EventSubscriber {
private final RescheduleCalculator rescheduleCalculator;
private final Clock clock;
- private final ScheduledExecutorService executor;
+ private final DelayExecutor executor;
private final Storage storage;
private final StateManager stateManager;
@@ -55,7 +55,7 @@ class TaskThrottler implements EventSubscriber {
TaskThrottler(
RescheduleCalculator rescheduleCalculator,
Clock clock,
- @AsyncExecutor ScheduledExecutorService executor,
+ @AsyncExecutor DelayExecutor executor,
Storage storage,
StateManager stateManager) {
@@ -73,7 +73,7 @@ class TaskThrottler implements EventSubscriber {
+ rescheduleCalculator.getFlappingPenaltyMs(stateChange.getTask());
long delayMs = Math.max(0, readyAtMs - clock.nowMillis());
throttleStats.accumulate(delayMs);
- executor.schedule(
+ executor.execute(
new Runnable() {
@Override
public void run() {
@@ -90,8 +90,7 @@ class TaskThrottler implements EventSubscriber {
});
}
},
- delayMs,
- TimeUnit.MILLISECONDS);
+ Amount.of(delayMs, Time.MILLISECONDS));
}
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
index ed92661..f0620b9 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
@@ -30,6 +30,7 @@ import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.PrivateModule;
import com.google.inject.TypeLiteral;
+import com.google.inject.util.Modules;
import com.twitter.common.args.Arg;
import com.twitter.common.args.CmdLine;
import com.twitter.common.inject.Bindings.KeyFactory;
@@ -37,6 +38,8 @@ import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.FlushableWorkQueue;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.CronJobStore;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
@@ -149,15 +152,23 @@ public final class DbModule extends PrivateModule {
*/
@VisibleForTesting
public static Module testModule(KeyFactory keyFactory, Optional<Module> taskStoreModule) {
- return new DbModule(
- keyFactory,
- taskStoreModule.isPresent() ? taskStoreModule.get() : getTaskStoreModule(keyFactory),
- "testdb-" + UUID.randomUUID().toString(),
- // A non-zero close delay is used here to avoid eager database cleanup in tests that
- // make use of multiple threads. Since all test databases are separately scoped by the
- // included UUID, multiple DB instances will overlap in time but they should be distinct
- // in content.
- ImmutableMap.of("DB_CLOSE_DELAY", "5"));
+ return Modules.combine(
+ new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(FlushableWorkQueue.class).annotatedWith(AsyncExecutor.class).toInstance(() -> { });
+ }
+ },
+ new DbModule(
+ keyFactory,
+ taskStoreModule.isPresent() ? taskStoreModule.get() : getTaskStoreModule(keyFactory),
+ "testdb-" + UUID.randomUUID().toString(),
+ // A non-zero close delay is used here to avoid eager database cleanup in tests that
+ // make use of multiple threads. Since all test databases are separately scoped by the
+ // included UUID, multiple DB instances will overlap in time but they should be distinct
+ // in content.
+ ImmutableMap.of("DB_CLOSE_DELAY", "5"))
+ );
}
/**
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
index a1f0d3c..aac62e2 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
@@ -28,6 +28,8 @@ import org.apache.aurora.gen.JobUpdateAction;
import org.apache.aurora.gen.JobUpdateStatus;
import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.FlushableWorkQueue;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.CronJobStore;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
@@ -59,11 +61,13 @@ class DbStorage extends AbstractIdleService implements Storage {
private final SqlSessionFactory sessionFactory;
private final MutableStoreProvider storeProvider;
private final EnumValueMapper enumValueMapper;
+ private final FlushableWorkQueue postTransactionWork;
@Inject
DbStorage(
SqlSessionFactory sessionFactory,
EnumValueMapper enumValueMapper,
+ @AsyncExecutor FlushableWorkQueue postTransactionWork,
final CronJobStore.Mutable cronJobStore,
final TaskStore.Mutable taskStore,
final SchedulerStore.Mutable schedulerStore,
@@ -74,6 +78,7 @@ class DbStorage extends AbstractIdleService implements Storage {
this.sessionFactory = requireNonNull(sessionFactory);
this.enumValueMapper = requireNonNull(enumValueMapper);
+ this.postTransactionWork = requireNonNull(postTransactionWork);
requireNonNull(cronJobStore);
requireNonNull(taskStore);
requireNonNull(schedulerStore);
@@ -139,11 +144,23 @@ class DbStorage extends AbstractIdleService implements Storage {
@Override
@Transactional
public <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E {
- try {
- return work.apply(storeProvider);
+ T result;
+ try (SqlSession session = sessionFactory.openSession(false)) {
+ result = work.apply(storeProvider);
+ session.commit();
} catch (PersistenceException e) {
throw new StorageException(e.getMessage(), e);
+ } finally {
+ // NOTE: Async work is intentionally executed regardless of whether the transaction succeeded.
+ // Doing otherwise runs the risk of cross-talk between transactions and losing async tasks
+ // due to failure of an unrelated transaction. This matches behavior prior to the
+ // introduction of DbStorage, but should be revisited.
+ // TODO(wfarner): Consider revisiting to execute async work only when the transaction is
+ // successful.
+ postTransactionWork.flush();
}
+
+ return result;
}
@VisibleForTesting
@@ -169,6 +186,8 @@ class DbStorage extends AbstractIdleService implements Storage {
} finally {
session.update(ENABLE_UNDO_LOG);
}
+ } finally {
+ postTransactionWork.flush();
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
index 3a86614..06e7f23 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbUtil.java
@@ -63,6 +63,8 @@ public final class DbUtil {
/**
* Creates a new, empty test storage system.
+ * <p>
+ * TODO(wfarner): Rename this to createTestStorage() to avoid misuse.
*
* @return A new storage instance.
*/
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java b/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
index 5384307..9d66685 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/AsyncModuleTest.java
@@ -29,6 +29,7 @@ import com.twitter.common.testing.easymock.EasyMockTest;
import com.twitter.common.util.Clock;
import org.apache.aurora.scheduler.AppStartup;
+import org.apache.aurora.scheduler.async.AsyncModule.RegisterGauges;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.junit.Before;
@@ -86,7 +87,10 @@ public class AsyncModuleTest extends EasyMockTest {
injector.getBindings();
assertEquals(
- ImmutableMap.of(AsyncModule.TIMEOUT_QUEUE_GAUGE, 0, AsyncModule.ASYNC_TASKS_GAUGE, 0L),
+ ImmutableMap.of(
+ RegisterGauges.TIMEOUT_QUEUE_GAUGE, 0,
+ RegisterGauges.ASYNC_TASKS_GAUGE, 0L,
+ RegisterGauges.DELAY_QUEUE_GAUGE, 0),
statsProvider.getAllValues()
);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java b/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java
index 91b91bc..6a17d3a 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/JettyServerModuleTest.java
@@ -41,12 +41,14 @@ import com.twitter.common.net.pool.DynamicHostSet;
import com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
import com.twitter.common.testing.easymock.EasyMockTest;
import com.twitter.common.util.BackoffStrategy;
import com.twitter.thrift.ServiceInstance;
import org.apache.aurora.gen.ServerInfo;
import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.async.AsyncModule;
import org.apache.aurora.scheduler.cron.CronJobManager;
import org.apache.aurora.scheduler.http.api.GsonMessageBodyHandler;
import org.apache.aurora.scheduler.offers.OfferManager;
@@ -57,6 +59,7 @@ import org.apache.aurora.scheduler.state.LockManager;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IServerInfo;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.easymock.Capture;
import org.junit.Before;
@@ -96,6 +99,7 @@ public abstract class JettyServerModuleTest extends EasyMockTest {
new StatsModule(),
new LifecycleModule(),
new SchedulerServicesModule(),
+ new AsyncModule(),
new AbstractModule() {
<T> T bindMock(Class<T> clazz) {
T mock = createMock(clazz);
@@ -105,6 +109,7 @@ public abstract class JettyServerModuleTest extends EasyMockTest {
@Override
protected void configure() {
+ bind(StatsProvider.class).toInstance(new FakeStatsProvider());
bind(Storage.class).toInstance(storage.storage);
bind(IServerInfo.class).toInstance(IServerInfo.build(new ServerInfo()
.setClusterName("unittest")
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityIT.java b/src/test/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityIT.java
index e725e10..a5703e5 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityIT.java
@@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.http.api.security;
import java.io.IOException;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
@@ -25,7 +24,6 @@ import com.google.inject.AbstractModule;
import com.google.inject.Module;
import com.google.inject.util.Modules;
import com.sun.jersey.api.client.ClientResponse;
-import com.twitter.common.stats.StatsProvider;
import org.apache.aurora.gen.AuroraAdmin;
import org.apache.aurora.gen.Lock;
@@ -62,7 +60,6 @@ import org.junit.Test;
import static org.apache.aurora.scheduler.http.H2ConsoleModule.H2_PATH;
import static org.apache.aurora.scheduler.http.H2ConsoleModule.H2_PERM;
import static org.apache.aurora.scheduler.http.api.ApiModule.API_PATH;
-import static org.easymock.EasyMock.anyString;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -98,7 +95,6 @@ public class HttpSecurityIT extends JettyServerModuleTest {
private Ini ini;
private AnnotatedAuroraAdmin auroraAdmin;
- private StatsProvider statsProvider;
private static final Joiner COMMA_JOINER = Joiner.on(", ");
private static final String ADMIN_ROLE = "admin";
@@ -138,8 +134,6 @@ public class HttpSecurityIT extends JettyServerModuleTest {
roles.put(H2_ROLE, H2_PERM);
auroraAdmin = createMock(AnnotatedAuroraAdmin.class);
- statsProvider = createMock(StatsProvider.class);
- expect(statsProvider.makeCounter(anyString())).andStubReturn(new AtomicLong());
}
@Override
@@ -152,7 +146,6 @@ public class HttpSecurityIT extends JettyServerModuleTest {
@Override
protected void configure() {
MockDecoratedThrift.bindForwardedMock(binder(), auroraAdmin);
- bind(StatsProvider.class).toInstance(statsProvider);
}
});
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
index 088a4a6..1cc9ec4 100644
--- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
@@ -13,7 +13,6 @@
*/
package org.apache.aurora.scheduler.offers;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
import com.google.common.base.Optional;
@@ -29,6 +28,7 @@ import org.apache.aurora.gen.JobKey;
import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.TaskGroupKey;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
import org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged;
@@ -85,8 +85,8 @@ public class OfferManagerImplTest extends EasyMockTest {
}
});
driver = createMock(Driver.class);
- ScheduledExecutorService executorMock = createMock(ScheduledExecutorService.class);
- clock = FakeScheduledExecutor.scheduleExecutor(executorMock);
+ DelayExecutor executorMock = createMock(DelayExecutor.class);
+ clock = FakeScheduledExecutor.fromDelayExecutor(executorMock);
addTearDown(new TearDown() {
@Override
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
index 532b0ea..892861d 100644
--- a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
@@ -14,20 +14,22 @@
package org.apache.aurora.scheduler.pruning;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
import com.twitter.common.base.Command;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
+import com.twitter.common.stats.StatsProvider;
import com.twitter.common.testing.easymock.EasyMockTest;
import com.twitter.common.util.testing.FakeClock;
@@ -39,6 +41,10 @@ import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.async.AsyncModule;
+import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.DelayExecutor;
+import org.apache.aurora.scheduler.async.FlushableWorkQueue;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.pruning.TaskHistoryPruner.HistoryPrunnerSettings;
@@ -46,6 +52,7 @@ import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
@@ -72,8 +79,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
private static final Amount<Long, Time> ONE_HOUR = Amount.of(1L, Time.HOURS);
private static final int PER_JOB_HISTORY = 2;
- private ScheduledFuture<?> future;
- private ScheduledExecutorService executor;
+ private DelayExecutor executor;
private FakeClock clock;
private StateManager stateManager;
private StorageTestUtil storageUtil;
@@ -81,8 +87,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
@Before
public void setUp() {
- future = createMock(new Clazz<ScheduledFuture<?>>() { });
- executor = createMock(ScheduledExecutorService.class);
+ executor = createMock(DelayExecutor.class);
clock = new FakeClock();
stateManager = createMock(StateManager.class);
storageUtil = new StorageTestUtil(this);
@@ -232,14 +237,29 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
// lock. This causes a deadlock when the executor tries to acquire a lock held by the event
// fired.
- pruner = prunerWithRealExecutor();
+ ScheduledThreadPoolExecutor realExecutor = new ScheduledThreadPoolExecutor(1,
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("testThreadSafeEvents-executor")
+ .build());
+ Injector injector = Guice.createInjector(
+ new AsyncModule(realExecutor),
+ new AbstractModule() {
+ @Override
+ protected void configure() {
+ bind(StatsProvider.class).toInstance(new FakeStatsProvider());
+ }
+ });
+ executor = injector.getInstance(Key.get(DelayExecutor.class, AsyncExecutor.class));
+ FlushableWorkQueue flusher =
+ injector.getInstance(Key.get(FlushableWorkQueue.class, AsyncExecutor.class));
+
+ pruner = buildPruner(executor);
Command onDeleted = new Command() {
@Override
public void execute() {
// The goal is to verify that the call does not deadlock. We do not care about the outcome.
- IScheduledTask b = makeTask("b", ASSIGNED);
-
- changeState(b, STARTING);
+ changeState(makeTask("b", ASSIGNED), STARTING);
}
};
CountDownLatch taskDeleted = expectTaskDeleted(onDeleted, TASK_ID);
@@ -248,17 +268,13 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
// Change the task to a terminal state and wait for it to be pruned.
changeState(makeTask(TASK_ID, RUNNING), KILLED);
+ flusher.flush();
taskDeleted.await();
}
- private TaskHistoryPruner prunerWithRealExecutor() {
- ScheduledExecutorService realExecutor = Executors.newScheduledThreadPool(1,
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("testThreadSafeEvents-executor")
- .build());
+ private TaskHistoryPruner buildPruner(DelayExecutor delayExecutor) {
return new TaskHistoryPruner(
- realExecutor,
+ delayExecutor,
stateManager,
clock,
new HistoryPrunnerSettings(Amount.of(1L, Time.MILLISECONDS), ONE_MS, PER_JOB_HISTORY),
@@ -326,7 +342,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
IScheduledTask... pruned) {
// Expect a deferred prune operation when a new task is being watched.
- executor.submit(EasyMock.<Runnable>anyObject());
+ executor.execute(EasyMock.<Runnable>anyObject());
expectLastCall().andAnswer(
new IAnswer<Future<?>>() {
@Override
@@ -348,11 +364,9 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
private Capture<Runnable> expectDelayedPrune(long timestampMillis, int count) {
Capture<Runnable> capture = createCapture();
- executor.schedule(
+ executor.execute(
EasyMock.capture(capture),
- eq(pruner.calculateTimeout(timestampMillis)),
- eq(TimeUnit.MILLISECONDS));
- expectLastCall().andReturn(future).times(count);
+ eq(Amount.of(pruner.calculateTimeout(timestampMillis), Time.MILLISECONDS)));
return capture;
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java b/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java
index 26f65fa..957cbd0 100644
--- a/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java
@@ -14,12 +14,10 @@
package org.apache.aurora.scheduler.reconciliation;
import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.concurrent.ScheduledExecutorService;
import javax.inject.Singleton;
import com.google.common.eventbus.EventBus;
-import com.google.common.testing.TearDown;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
@@ -34,6 +32,7 @@ import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.events.PubsubEventModule;
@@ -67,14 +66,9 @@ public class KillRetryTest extends EasyMockTest {
storageUtil = new StorageTestUtil(this);
storageUtil.expectOperations();
backoffStrategy = createMock(BackoffStrategy.class);
- final ScheduledExecutorService executorMock = createMock(ScheduledExecutorService.class);
- clock = FakeScheduledExecutor.scheduleExecutor(executorMock);
- addTearDown(new TearDown() {
- @Override
- public void tearDown() {
- clock.assertEmpty();
- }
- });
+ final DelayExecutor executorMock = createMock(DelayExecutor.class);
+ clock = FakeScheduledExecutor.fromDelayExecutor(executorMock);
+ addTearDown(clock::assertEmpty);
statsProvider = new FakeStatsProvider();
Injector injector = Guice.createInjector(
@@ -85,8 +79,7 @@ public class KillRetryTest extends EasyMockTest {
protected void configure() {
bind(Driver.class).toInstance(driver);
bind(Storage.class).toInstance(storageUtil.storage);
- bind(ScheduledExecutorService.class).annotatedWith(AsyncExecutor.class)
- .toInstance(executorMock);
+ bind(DelayExecutor.class).annotatedWith(AsyncExecutor.class).toInstance(executorMock);
PubsubEventModule.bindSubscriber(binder(), KillRetry.class);
bind(KillRetry.class).in(Singleton.class);
bind(BackoffStrategy.class).toInstance(backoffStrategy);
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java
index 97d25f9..2bcda70 100644
--- a/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/reconciliation/TaskTimeoutTest.java
@@ -13,8 +13,6 @@
*/
package org.apache.aurora.scheduler.reconciliation;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Optional;
@@ -30,6 +28,7 @@ import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.state.StateChangeResult;
import org.apache.aurora.scheduler.state.StateManager;
@@ -52,7 +51,6 @@ import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.gen.ScheduleStatus.STARTING;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertEquals;
public class TaskTimeoutTest extends EasyMockTest {
@@ -61,9 +59,8 @@ public class TaskTimeoutTest extends EasyMockTest {
private static final Amount<Long, Time> TIMEOUT = Amount.of(1L, Time.MINUTES);
private AtomicLong timedOutTaskCounter;
- private ScheduledExecutorService executor;
+ private DelayExecutor executor;
private StorageTestUtil storageUtil;
- private ScheduledFuture<?> future;
private StateManager stateManager;
private FakeClock clock;
private TaskTimeout timeout;
@@ -71,10 +68,9 @@ public class TaskTimeoutTest extends EasyMockTest {
@Before
public void setUp() {
- executor = createMock(ScheduledExecutorService.class);
+ executor = createMock(DelayExecutor.class);
storageUtil = new StorageTestUtil(this);
storageUtil.expectOperations();
- future = createMock(new Clazz<ScheduledFuture<?>>() { });
stateManager = createMock(StateManager.class);
clock = new FakeClock();
statsProvider = createMock(StatsProvider.class);
@@ -96,11 +92,7 @@ public class TaskTimeoutTest extends EasyMockTest {
private Capture<Runnable> expectTaskWatch(Amount<Long, Time> expireIn) {
Capture<Runnable> capture = createCapture();
- executor.schedule(
- EasyMock.capture(capture),
- eq((long) expireIn.getValue()),
- eq(expireIn.getUnit().getTimeUnit()));
- expectLastCall().andReturn(future);
+ executor.execute(EasyMock.capture(capture), eq(expireIn));
return capture;
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
index 55aad35..6bcfefd 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
@@ -13,8 +13,6 @@
*/
package org.apache.aurora.scheduler.scheduling;
-import java.util.concurrent.ScheduledExecutorService;
-
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.RateLimiter;
import com.twitter.common.quantity.Amount;
@@ -27,9 +25,11 @@ import org.apache.aurora.gen.JobKey;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupsSettings;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
@@ -56,17 +56,15 @@ public class TaskGroupsTest extends EasyMockTest {
@Before
public void setUp() throws Exception {
- ScheduledExecutorService executor = createMock(ScheduledExecutorService.class);
- clock = FakeScheduledExecutor.scheduleExecutor(executor);
+ DelayExecutor executor = createMock(DelayExecutor.class);
+ clock = FakeScheduledExecutor.fromDelayExecutor(executor);
backoffStrategy = createMock(BackoffStrategy.class);
taskScheduler = createMock(TaskScheduler.class);
rateLimiter = createMock(RateLimiter.class);
rescheduleCalculator = createMock(RescheduleCalculator.class);
taskGroups = new TaskGroups(
executor,
- FIRST_SCHEDULE_DELAY,
- backoffStrategy,
- rateLimiter,
+ new TaskGroupsSettings(FIRST_SCHEDULE_DELAY, backoffStrategy, rateLimiter),
taskScheduler,
rescheduleCalculator);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
index 4055021..ba08fe5 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
@@ -13,9 +13,6 @@
*/
package org.apache.aurora.scheduler.scheduling;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.twitter.common.quantity.Amount;
@@ -27,6 +24,7 @@ import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.state.StateChangeResult;
@@ -49,7 +47,7 @@ public class TaskThrottlerTest extends EasyMockTest {
private RescheduleCalculator rescheduleCalculator;
private FakeClock clock;
- private ScheduledExecutorService executor;
+ private DelayExecutor executor;
private StorageTestUtil storageUtil;
private StateManager stateManager;
private TaskThrottler throttler;
@@ -58,7 +56,7 @@ public class TaskThrottlerTest extends EasyMockTest {
public void setUp() throws Exception {
rescheduleCalculator = createMock(RescheduleCalculator.class);
clock = new FakeClock();
- executor = createMock(ScheduledExecutorService.class);
+ executor = createMock(DelayExecutor.class);
storageUtil = new StorageTestUtil(this);
storageUtil.expectOperations();
stateManager = createMock(StateManager.class);
@@ -116,11 +114,9 @@ public class TaskThrottlerTest extends EasyMockTest {
private Capture<Runnable> expectThrottled(long penaltyMs) {
Capture<Runnable> stateChangeCapture = createCapture();
- expect(executor.schedule(
+ executor.execute(
capture(stateChangeCapture),
- eq(penaltyMs),
- eq(TimeUnit.MILLISECONDS)))
- .andReturn(null);
+ eq(Amount.of(penaltyMs, Time.MILLISECONDS)));
return stateChangeCapture;
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
index a4bcdd7..3b05db9 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
@@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.storage.db;
import com.twitter.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.scheduler.async.FlushableWorkQueue;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.CronJobStore;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
@@ -40,6 +41,7 @@ public class DbStorageTest extends EasyMockTest {
private SqlSessionFactory sessionFactory;
private SqlSession session;
private EnumValueMapper enumMapper;
+ private FlushableWorkQueue flusher;
private Work.Quiet<String> readWork;
private MutateWork.NoResult.Quiet writeWork;
@@ -50,12 +52,14 @@ public class DbStorageTest extends EasyMockTest {
sessionFactory = createMock(SqlSessionFactory.class);
session = createMock(SqlSession.class);
enumMapper = createMock(EnumValueMapper.class);
+ flusher = createMock(FlushableWorkQueue.class);
readWork = createMock(new Clazz<Work.Quiet<String>>() { });
writeWork = createMock(new Clazz<MutateWork.NoResult.Quiet>() { });
storage = new DbStorage(
sessionFactory,
enumMapper,
+ flusher,
createMock(CronJobStore.Mutable.class),
createMock(TaskStore.Mutable.class),
createMock(SchedulerStore.Mutable.class),
@@ -89,6 +93,7 @@ public class DbStorageTest extends EasyMockTest {
expect(sessionFactory.openSession(false)).andReturn(session);
expect(session.update(DbStorage.DISABLE_UNDO_LOG)).andThrow(new PersistenceException());
expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0);
+ flusher.flush();
control.replay();
@@ -102,6 +107,7 @@ public class DbStorageTest extends EasyMockTest {
expect(writeWork.apply(EasyMock.anyObject())).andReturn(null);
session.close();
expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0);
+ flusher.flush();
control.replay();
http://git-wip-us.apache.org/repos/asf/aurora/blob/61c63ea9/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
index e198c1c..48978ec 100644
--- a/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
+++ b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
@@ -27,6 +27,7 @@ import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.util.testing.FakeClock;
+import org.apache.aurora.scheduler.async.DelayExecutor;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
@@ -43,6 +44,7 @@ public final class FakeScheduledExecutor extends FakeClock {
private FakeScheduledExecutor() { }
+ // TODO(wfarner): Rename to fromScheduledExecutor().
public static FakeScheduledExecutor scheduleExecutor(ScheduledExecutorService mock) {
FakeScheduledExecutor executor = new FakeScheduledExecutor();
mock.schedule(
@@ -57,6 +59,33 @@ public final class FakeScheduledExecutor extends FakeClock {
return executor;
}
+ private static IAnswer<Object> answerExecuteWithDelay(final FakeScheduledExecutor executor) {
+ return new IAnswer<Object>() {
+ @Override
+ public Object answer() {
+ Object[] args = EasyMock.getCurrentArguments();
+ Runnable work = (Runnable) args[0];
+ @SuppressWarnings("unchecked")
+ Amount<Long, Time> delay = (Amount<Long, Time>) args[1];
+ addDelayedWork(executor, delay.as(Time.MILLISECONDS), work);
+ return null;
+ }
+ };
+ }
+
+ public static FakeScheduledExecutor fromDelayExecutor(DelayExecutor mock) {
+ FakeScheduledExecutor executor = new FakeScheduledExecutor();
+ mock.execute(
+ EasyMock.<Runnable>anyObject(),
+ EasyMock.<Amount<Long, Time>>anyObject());
+ expectLastCall().andAnswer(answerExecuteWithDelay(executor)).anyTimes();
+
+ mock.execute(EasyMock.anyObject());
+ expectLastCall().andAnswer(answerExecute()).anyTimes();
+
+ return executor;
+ }
+
private static IAnswer<Void> answerExecute() {
return new IAnswer<Void>() {
@Override
@@ -69,10 +98,10 @@ public final class FakeScheduledExecutor extends FakeClock {
};
}
- private static IAnswer<ScheduledFuture<?>> answerSchedule(final FakeScheduledExecutor executor) {
- return new IAnswer<ScheduledFuture<?>>() {
+ private static IAnswer<Object> answerSchedule(final FakeScheduledExecutor executor) {
+ return new IAnswer<Object>() {
@Override
- public ScheduledFuture<?> answer() {
+ public Object answer() {
Object[] args = EasyMock.getCurrentArguments();
Runnable work = (Runnable) args[0];
long value = (Long) args[1];