You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/09/16 21:45:17 UTC
[1/3] aurora git commit: Batching writes - Part 1 (of 3): Introducing
BatchWorker and task event batching.
Repository: aurora
Updated Branches:
refs/heads/master f1e09a9c7 -> 496397aa5
Batching writes - Part 1 (of 3): Introducing BatchWorker and task event batching.
Reviewed at https://reviews.apache.org/r/51759/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/ebfeb3e6
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/ebfeb3e6
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/ebfeb3e6
Branch: refs/heads/master
Commit: ebfeb3e602faa9281ff7ff50f42bd21885518953
Parents: f1e09a9
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri Sep 16 14:17:04 2016 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Sep 16 14:17:04 2016 -0700
----------------------------------------------------------------------
.../apache/aurora/scheduler/BatchWorker.java | 254 +++++++++++++++++++
.../aurora/scheduler/SchedulerModule.java | 25 ++
.../scheduler/pruning/TaskHistoryPruner.java | 14 +-
.../scheduler/scheduling/TaskThrottler.java | 28 +-
.../scheduler/state/MaintenanceController.java | 14 +-
.../updater/JobUpdateControllerImpl.java | 10 +-
.../aurora/scheduler/BatchWorkerTest.java | 96 +++++++
.../pruning/TaskHistoryPrunerTest.java | 10 +-
.../scheduler/scheduling/TaskThrottlerTest.java | 9 +-
.../state/MaintenanceControllerImplTest.java | 5 +
.../scheduler/testing/BatchWorkerUtil.java | 59 +++++
.../aurora/scheduler/updater/JobUpdaterIT.java | 7 +-
12 files changed, 505 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/BatchWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/BatchWorker.java b/src/main/java/org/apache/aurora/scheduler/BatchWorker.java
new file mode 100644
index 0000000..e05d4b4
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/BatchWorker.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+
+import org.apache.aurora.common.stats.SlidingStats;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.common.util.BackoffStrategy;
+import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Generic helper that allows bundling multiple work items into a single {@link Storage}
+ * transaction aiming to reduce the write lock contention.
+ *
+ * @param <T> Expected result type.
+ */
+public class BatchWorker<T> extends AbstractExecutionThreadService {
+ /**
+ * Empty result placeholder.
+ */
+ public interface NoResult { }
+
+ /**
+ * Convenience wrapper for a non-repeatable no value work {@link Result}.
+ */
+ public static final NoResult NO_RESULT = new NoResult() { };
+
+ private static final Logger LOG = LoggerFactory.getLogger(BatchWorker.class);
+ private final Storage storage;
+ private final int maxBatchSize;
+ private final SlidingStats batchUnlocked;
+ private final SlidingStats batchLocked;
+ private final BlockingQueue<WorkItem<T>> workQueue = new LinkedBlockingQueue<>();
+ private final ScheduledExecutorService scheduledExecutor;
+ private final AtomicInteger lastBatchSize = new AtomicInteger(0);
+ private final AtomicLong itemsProcessed;
+ private final AtomicLong batchesProcessed;
+
+ /**
+ * Wraps result returned by the {@link RepeatableWork} item.
+ *
+ * @param <T> Expected result type.
+ */
+ public static class Result<T> {
+ private final boolean isCompleted;
+ private final T value;
+
+ /**
+ * Initializes a {@link Result} instance with {@code isCompleted} and {@code value}.
+ * <p>
+ * The {@code isCompleted} may be set to {@code False} for a {@link RepeatableWork} that has
+ * not finished yet. Otherwise, it must be set to {@code True}.
+ *
+ * @param isCompleted Flag indicating if the {@link RepeatableWork} has completed.
+ * @param value result value.
+ */
+ public Result(boolean isCompleted, T value) {
+ this.isCompleted = isCompleted;
+ this.value = value;
+ }
+ }
+
+ /**
+ * Encapsulates a potentially repeatable operation.
+ */
+ public interface RepeatableWork<T> {
+ /**
+ * Abstracts a unit of repeatable (i.e.: "repeat until completed") work.
+ * <p>
+ * The work unit may be repeated as instructed by the {@link Result}.
+ *
+ * @param storeProvider {@link MutableStoreProvider} instance.
+ * @return {@link Result}
+ */
+ Result<T> apply(MutableStoreProvider storeProvider);
+ }
+
+ /**
+ * Encapsulates a non-repeatable operation.
+ */
+ public interface Work<T> extends RepeatableWork<T> {
+ @Override
+ default Result<T> apply(MutableStoreProvider storeProvider) {
+ T value = execute(storeProvider);
+ return new Result<>(true, value);
+ }
+
+ /**
+ * Abstracts a unit of non-repeatable (i.e.: "run exactly once") work.
+ *
+ * @param storeProvider {@link MutableStoreProvider} instance.
+ * @return result value.
+ */
+ T execute(MutableStoreProvider storeProvider);
+ }
+
+ @Inject
+ protected BatchWorker(Storage storage, StatsProvider statsProvider, int maxBatchSize) {
+ this.storage = requireNonNull(storage);
+ this.maxBatchSize = maxBatchSize;
+
+ scheduledExecutor = AsyncUtil.singleThreadLoggingScheduledExecutor(serviceName() + "-%d", LOG);
+ statsProvider.makeGauge(serviceName() + "_queue_size", () -> workQueue.size());
+ statsProvider.makeGauge(
+ serviceName() + "_last_processed_batch_size",
+ () -> lastBatchSize.intValue());
+ batchUnlocked = new SlidingStats(serviceName() + "_batch_unlocked", "nanos");
+ batchLocked = new SlidingStats(serviceName() + "_batch_locked", "nanos");
+ itemsProcessed = statsProvider.makeCounter(serviceName() + "_items_processed");
+ batchesProcessed = statsProvider.makeCounter(serviceName() + "_batches_processed");
+ }
+
+ /**
+ * Executes a non-repeatable {@link Work} and returns {@link CompletableFuture} to wait on.
+ *
+ * @param work A non-repeatable {@link Work} to execute.
+ * @return {@link CompletableFuture} to wait on.
+ */
+ public CompletableFuture<T> execute(Work<T> work) {
+ CompletableFuture<T> result = new CompletableFuture<>();
+ workQueue.add(new WorkItem<>(
+ work,
+ result,
+ Optional.empty(),
+ Optional.empty()));
+
+ return result;
+ }
+
+ /**
+ * Executes a {@link RepeatableWork} until it completes and returns {@link CompletableFuture}
+ * to wait on.
+ *
+ * @param backoffStrategy A {@link BackoffStrategy} instance to backoff subsequent runs.
+ * @param work A {@link RepeatableWork} to execute.
+ */
+ public CompletableFuture<T> executeWithReplay(
+ BackoffStrategy backoffStrategy,
+ RepeatableWork<T> work) {
+
+ CompletableFuture<T> result = new CompletableFuture<>();
+ workQueue.add(new WorkItem<>(
+ work,
+ result,
+ Optional.of(backoffStrategy),
+ Optional.of(0L)));
+
+ return result;
+ }
+
+ @Override
+ protected void run() throws Exception {
+ while (isRunning()) {
+ List<WorkItem<T>> batch = new LinkedList<>();
+ batch.add(workQueue.take());
+ workQueue.drainTo(batch, maxBatchSize - batch.size());
+ processBatch(batch);
+ }
+ }
+
+ private void processBatch(List<WorkItem<T>> batch) {
+ if (!batch.isEmpty()) {
+ long unlockedStart = System.nanoTime();
+ storage.write((Storage.MutateWork.NoResult.Quiet) storeProvider -> {
+ long lockedStart = System.nanoTime();
+ for (WorkItem<T> item : batch) {
+ try {
+ Result<T> itemResult = item.work.apply(storeProvider);
+ if (itemResult.isCompleted) {
+ item.result.complete(itemResult.value);
+ } else {
+ // Work not finished yet - re-queue for a followup later.
+ long backoffMsec = backoffFor(item);
+ scheduledExecutor.schedule(
+ () -> workQueue.add(new WorkItem<>(
+ item.work,
+ item.result,
+ item.backoffStrategy,
+ Optional.of(backoffMsec))),
+ backoffMsec,
+ TimeUnit.MILLISECONDS);
+ }
+ } catch (RuntimeException e) {
+ LOG.error("{}: Failed to process batch item. Error: {}", serviceName(), e);
+ item.result.completeExceptionally(e);
+ }
+ }
+ batchLocked.accumulate(System.nanoTime() - lockedStart);
+ });
+ batchUnlocked.accumulate(System.nanoTime() - unlockedStart);
+ batchesProcessed.incrementAndGet();
+ lastBatchSize.set(batch.size());
+ itemsProcessed.addAndGet(batch.size());
+ }
+ }
+
+ private long backoffFor(WorkItem<T> item) {
+ checkState(item.backoffStrategy.isPresent());
+ checkState(item.lastBackoffMsec.isPresent());
+ return item.backoffStrategy.get().calculateBackoffMs(item.lastBackoffMsec.get());
+ }
+
+ private class WorkItem<V> {
+ private final RepeatableWork<V> work;
+ private final CompletableFuture<T> result;
+ private final Optional<BackoffStrategy> backoffStrategy;
+ private final Optional<Long> lastBackoffMsec;
+
+ WorkItem(
+ RepeatableWork<V> work,
+ CompletableFuture<T> result,
+ Optional<BackoffStrategy> backoffStrategy,
+ Optional<Long> lastBackoffMsec) {
+
+ this.work = work;
+ this.result = result;
+ this.backoffStrategy = backoffStrategy;
+ this.lastBackoffMsec = lastBackoffMsec;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
index 4a7ef0b..2ec3967 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
@@ -16,6 +16,8 @@ package org.apache.aurora.scheduler;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
+
+import javax.inject.Inject;
import javax.inject.Singleton;
import com.google.inject.AbstractModule;
@@ -27,10 +29,13 @@ import org.apache.aurora.common.args.CmdLine;
import org.apache.aurora.common.args.constraints.Positive;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.scheduler.BatchWorker.NoResult;
import org.apache.aurora.scheduler.SchedulerLifecycle.LeadingOptions;
import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
import org.apache.aurora.scheduler.base.AsyncUtil;
import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.storage.Storage;
import org.apache.mesos.Protos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,6 +64,11 @@ public class SchedulerModule extends AbstractModule {
help = "The maximum number of status updates that can be processed in a batch.")
private static final Arg<Integer> MAX_STATUS_UPDATE_BATCH_SIZE = Arg.create(1000);
+ @Positive
+ @CmdLine(name = "max_task_event_batch_size",
+ help = "The maximum number of task state change events that can be processed in a batch.")
+ private static final Arg<Integer> MAX_TASK_EVENT_BATCH_SIZE = Arg.create(300);
+
@Override
protected void configure() {
bind(TaskIdGenerator.class).to(TaskIdGeneratorImpl.class);
@@ -93,6 +103,21 @@ public class SchedulerModule extends AbstractModule {
bind(TaskStatusHandler.class).to(TaskStatusHandlerImpl.class);
bind(TaskStatusHandlerImpl.class).in(Singleton.class);
addSchedulerActiveServiceBinding(binder()).to(TaskStatusHandlerImpl.class);
+
+ bind(TaskEventBatchWorker.class).in(Singleton.class);
+ addSchedulerActiveServiceBinding(binder()).to(TaskEventBatchWorker.class);
}
+ public static class TaskEventBatchWorker extends BatchWorker<NoResult> {
+ @Inject
+ TaskEventBatchWorker(Storage storage, StatsProvider statsProvider) {
+
+ super(storage, statsProvider, MAX_TASK_EVENT_BATCH_SIZE.get());
+ }
+
+ @Override
+ protected String serviceName() {
+ return "TaskEventBatchWorker";
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
index f07746c..c672826 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
@@ -29,13 +29,14 @@ import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.util.Clock;
import org.apache.aurora.gen.apiConstants;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.slf4j.Logger;
@@ -62,6 +63,7 @@ public class TaskHistoryPruner implements EventSubscriber {
private final HistoryPrunnerSettings settings;
private final Storage storage;
private final Lifecycle lifecycle;
+ private final TaskEventBatchWorker batchWorker;
private final Predicate<IScheduledTask> safeToDelete = new Predicate<IScheduledTask>() {
@Override
@@ -94,7 +96,8 @@ public class TaskHistoryPruner implements EventSubscriber {
Clock clock,
HistoryPrunnerSettings settings,
Storage storage,
- Lifecycle lifecycle) {
+ Lifecycle lifecycle,
+ TaskEventBatchWorker batchWorker) {
this.executor = requireNonNull(executor);
this.stateManager = requireNonNull(stateManager);
@@ -102,6 +105,7 @@ public class TaskHistoryPruner implements EventSubscriber {
this.settings = requireNonNull(settings);
this.storage = requireNonNull(storage);
this.lifecycle = requireNonNull(lifecycle);
+ this.batchWorker = requireNonNull(batchWorker);
}
@VisibleForTesting
@@ -131,8 +135,10 @@ public class TaskHistoryPruner implements EventSubscriber {
private void deleteTasks(final Set<String> taskIds) {
LOG.info("Pruning inactive tasks " + taskIds);
- storage.write(
- (NoResult.Quiet) storeProvider -> stateManager.deleteTasks(storeProvider, taskIds));
+ batchWorker.execute(storeProvider -> {
+ stateManager.deleteTasks(storeProvider, taskIds);
+ return BatchWorker.NO_RESULT;
+ });
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
index bbd971a..867c9bd 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
@@ -22,13 +22,14 @@ import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.SlidingStats;
import org.apache.aurora.common.util.Clock;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage;
import static java.util.Objects.requireNonNull;
@@ -46,8 +47,8 @@ class TaskThrottler implements EventSubscriber {
private final RescheduleCalculator rescheduleCalculator;
private final Clock clock;
private final DelayExecutor executor;
- private final Storage storage;
private final StateManager stateManager;
+ private final TaskEventBatchWorker batchWorker;
private final SlidingStats throttleStats = new SlidingStats("task_throttle", "ms");
@@ -56,14 +57,14 @@ class TaskThrottler implements EventSubscriber {
RescheduleCalculator rescheduleCalculator,
Clock clock,
@AsyncExecutor DelayExecutor executor,
- Storage storage,
- StateManager stateManager) {
+ StateManager stateManager,
+ TaskEventBatchWorker batchWorker) {
this.rescheduleCalculator = requireNonNull(rescheduleCalculator);
this.clock = requireNonNull(clock);
this.executor = requireNonNull(executor);
- this.storage = requireNonNull(storage);
this.stateManager = requireNonNull(stateManager);
+ this.batchWorker = requireNonNull(batchWorker);
}
@Subscribe
@@ -73,13 +74,16 @@ class TaskThrottler implements EventSubscriber {
+ rescheduleCalculator.getFlappingPenaltyMs(stateChange.getTask());
long delayMs = Math.max(0, readyAtMs - clock.nowMillis());
throttleStats.accumulate(delayMs);
- executor.execute(
- () -> storage.write(storeProvider -> stateManager.changeState(
- storeProvider,
- stateChange.getTaskId(),
- Optional.of(THROTTLED),
- PENDING,
- Optional.absent())),
+ executor.execute(() ->
+ batchWorker.execute(storeProvider -> {
+ stateManager.changeState(
+ storeProvider,
+ stateChange.getTaskId(),
+ Optional.of(THROTTLED),
+ PENDING,
+ Optional.absent());
+ return BatchWorker.NO_RESULT;
+ }),
Amount.of(delayMs, Time.MILLISECONDS));
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
index 3c7cda0..574efc9 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -30,6 +30,8 @@ import com.google.common.eventbus.Subscribe;
import org.apache.aurora.gen.HostStatus;
import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
@@ -37,7 +39,6 @@ import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
import org.apache.aurora.scheduler.storage.entities.IHostStatus;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -106,11 +107,17 @@ public interface MaintenanceController {
private static final Logger LOG = LoggerFactory.getLogger(MaintenanceControllerImpl.class);
private final Storage storage;
private final StateManager stateManager;
+ private final TaskEventBatchWorker batchWorker;
@Inject
- public MaintenanceControllerImpl(Storage storage, StateManager stateManager) {
+ public MaintenanceControllerImpl(
+ Storage storage,
+ StateManager stateManager,
+ TaskEventBatchWorker batchWorker) {
+
this.storage = requireNonNull(storage);
this.stateManager = requireNonNull(stateManager);
+ this.batchWorker = requireNonNull(batchWorker);
}
private Set<IHostStatus> watchDrainingTasks(MutableStoreProvider store, Set<String> hosts) {
@@ -153,7 +160,7 @@ public interface MaintenanceController {
public void taskChangedState(final TaskStateChange change) {
if (Tasks.isTerminated(change.getNewState())) {
final String host = change.getTask().getAssignedTask().getSlaveHost();
- storage.write((NoResult.Quiet) (MutableStoreProvider store) -> {
+ batchWorker.execute(store -> {
// If the task _was_ associated with a draining host, and it was the last task on the
// host.
Optional<IHostAttributes> attributes =
@@ -168,6 +175,7 @@ public interface MaintenanceController {
LOG.info("Host {} is DRAINING with active tasks: {}", host, Tasks.ids(activeTasks));
}
}
+ return BatchWorker.NO_RESULT;
});
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index ef6253e..25b3f37 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -44,6 +44,8 @@ import org.apache.aurora.gen.JobUpdateQuery;
import org.apache.aurora.gen.JobUpdateStatus;
import org.apache.aurora.gen.Lock;
import org.apache.aurora.gen.LockKey;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
import org.apache.aurora.scheduler.base.InstanceKeys;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Query;
@@ -120,6 +122,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
private final Clock clock;
private final PulseHandler pulseHandler;
private final Lifecycle lifecycle;
+ private final TaskEventBatchWorker batchWorker;
// Currently-active updaters. An active updater is one that is rolling forward or back. Paused
// and completed updates are represented only in storage, not here.
@@ -134,7 +137,8 @@ class JobUpdateControllerImpl implements JobUpdateController {
ScheduledExecutorService executor,
StateManager stateManager,
Clock clock,
- Lifecycle lifecycle) {
+ Lifecycle lifecycle,
+ TaskEventBatchWorker batchWorker) {
this.updateFactory = requireNonNull(updateFactory);
this.lockManager = requireNonNull(lockManager);
@@ -143,6 +147,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
this.stateManager = requireNonNull(stateManager);
this.clock = requireNonNull(clock);
this.lifecycle = requireNonNull(lifecycle);
+ this.batchWorker = requireNonNull(batchWorker);
this.pulseHandler = new PulseHandler(clock);
}
@@ -346,7 +351,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
}
private void instanceChanged(final IInstanceKey instance, final Optional<IScheduledTask> state) {
- storage.write((NoResult.Quiet) storeProvider -> {
+ batchWorker.execute(storeProvider -> {
IJobKey job = instance.getJobKey();
UpdateFactory.Update update = updates.get(job);
if (update != null) {
@@ -366,6 +371,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
+ JobKeys.canonicalString(job));
}
}
+ return BatchWorker.NO_RESULT;
});
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java b/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java
new file mode 100644
index 0000000..a86dc82
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.util.BackoffStrategy;
+import org.apache.aurora.scheduler.BatchWorker.Result;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertTrue;
+
+public class BatchWorkerTest extends EasyMockTest {
+ private static final String SERVICE_NAME = "TestWorker";
+ private static final String BATCH_STAT = SERVICE_NAME + "_batches_processed";
+ private FakeStatsProvider statsProvider;
+ private BatchWorker<Boolean> batchWorker;
+
+ @Before
+ public void setUp() {
+ StorageTestUtil storageUtil = new StorageTestUtil(this);
+ storageUtil.expectOperations();
+ statsProvider = new FakeStatsProvider();
+ batchWorker = new BatchWorker<Boolean>(storageUtil.storage, statsProvider, 2) {
+ @Override
+ protected String serviceName() {
+ return SERVICE_NAME;
+ }
+ };
+ }
+
+ @Test
+ public void testExecute() throws Exception {
+ control.replay();
+
+ CompletableFuture<Boolean> result1 = batchWorker.execute(store -> true);
+ CompletableFuture<Boolean> result2 = batchWorker.execute(store -> true);
+ CompletableFuture<Boolean> result3 = batchWorker.execute(store -> true);
+ batchWorker.startAsync().awaitRunning();
+
+ assertTrue(result1.get());
+ assertTrue(result2.get());
+ assertTrue(result3.get());
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testExecuteThrows() throws Exception {
+ control.replay();
+
+ CompletableFuture<Boolean> result =
+ batchWorker.execute(store -> { throw new IllegalArgumentException(); });
+ batchWorker.startAsync().awaitRunning();
+
+ result.get();
+ }
+
+ @Test
+ public void testExecuteWithReplay() throws Exception {
+ BackoffStrategy backoff = createMock(BackoffStrategy.class);
+ final CountDownLatch complete = new CountDownLatch(1);
+
+ expect(backoff.calculateBackoffMs(EasyMock.anyLong())).andReturn(0L).anyTimes();
+
+ control.replay();
+
+ batchWorker.startAsync().awaitRunning();
+ batchWorker.executeWithReplay(
+ backoff,
+ store -> statsProvider.getValue(BATCH_STAT).longValue() > 1L
+ ? new Result<>(true, true)
+ : new Result<>(false, false))
+ .thenAccept(result -> complete.countDown());
+
+ assertTrue(complete.await(10L, TimeUnit.SECONDS));
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
index 99c27e8..8469596 100644
--- a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
@@ -27,6 +27,7 @@ import org.apache.aurora.common.testing.easymock.EasyMockTest;
import org.apache.aurora.common.util.testing.FakeClock;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.TaskTestUtil;
@@ -48,6 +49,7 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLED;
import static org.apache.aurora.gen.ScheduleStatus.LOST;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expectLastCall;
@@ -68,20 +70,24 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
private Command shutdownCommand;
@Before
- public void setUp() {
+ public void setUp() throws Exception {
executor = createMock(DelayExecutor.class);
clock = new FakeClock();
stateManager = createMock(StateManager.class);
storageUtil = new StorageTestUtil(this);
storageUtil.expectOperations();
shutdownCommand = createMock(Command.class);
+ TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class);
+ expectBatchExecute(batchWorker, storageUtil.storage, control).anyTimes();
+
pruner = new TaskHistoryPruner(
executor,
stateManager,
clock,
new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY),
storageUtil.storage,
- new Lifecycle(shutdownCommand));
+ new Lifecycle(shutdownCommand),
+ batchWorker);
closer = Closer.create();
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
index 7d104aa..433f791 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
@@ -24,6 +24,7 @@ import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -39,6 +40,7 @@ import static org.apache.aurora.gen.ScheduleStatus.INIT;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
@@ -60,12 +62,15 @@ public class TaskThrottlerTest extends EasyMockTest {
storageUtil = new StorageTestUtil(this);
storageUtil.expectOperations();
stateManager = createMock(StateManager.class);
+ TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class);
+ expectBatchExecute(batchWorker, storageUtil.storage, control).anyTimes();
+
throttler = new TaskThrottler(
rescheduleCalculator,
clock,
executor,
- storageUtil.storage,
- stateManager);
+ stateManager,
+ batchWorker);
}
@Test
http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
index 94f5ca5..ae83dea 100644
--- a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
@@ -30,6 +30,7 @@ import org.apache.aurora.gen.HostStatus;
import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.TaskTestUtil;
@@ -53,6 +54,7 @@ import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED;
import static org.apache.aurora.gen.ScheduleStatus.KILLED;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl;
+import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
@@ -71,6 +73,8 @@ public class MaintenanceControllerImplTest extends EasyMockTest {
storageUtil = new StorageTestUtil(this);
storageUtil.expectOperations();
stateManager = createMock(StateManager.class);
+ TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class);
+ expectBatchExecute(batchWorker, storageUtil.storage, control).anyTimes();
Injector injector = Guice.createInjector(
new PubsubEventModule(),
@@ -83,6 +87,7 @@ public class MaintenanceControllerImplTest extends EasyMockTest {
bind(StatsProvider.class).toInstance(new FakeStatsProvider());
bind(Executor.class).annotatedWith(AsyncExecutor.class)
.toInstance(MoreExecutors.directExecutor());
+ bind(TaskEventBatchWorker.class).toInstance(batchWorker);
}
});
maintenance = injector.getInstance(MaintenanceController.class);
http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/testing/BatchWorkerUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/testing/BatchWorkerUtil.java b/src/test/java/org/apache/aurora/scheduler/testing/BatchWorkerUtil.java
new file mode 100644
index 0000000..46b2e36
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/testing/BatchWorkerUtil.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.testing;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.BatchWorker.Work;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.easymock.Capture;
+import org.easymock.IExpectationSetters;
+import org.easymock.IMocksControl;
+
+import static org.apache.aurora.common.testing.easymock.EasyMockTest.createCapture;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+
+public final class BatchWorkerUtil {
+ private BatchWorkerUtil() {
+ // Utility class.
+ }
+
+ public static <T> IExpectationSetters<CompletableFuture<T>> expectBatchExecute(
+ BatchWorker<T> batchWorker,
+ Storage storage,
+ IMocksControl control,
+ T resultValue) throws Exception {
+
+ final CompletableFuture<T> result = new EasyMockTest.Clazz<CompletableFuture<T>>() { }
+ .createMock(control);
+ expect(result.get()).andReturn(resultValue).anyTimes();
+
+ final Capture<Work<T>> capture = createCapture();
+ return expect(batchWorker.execute(capture(capture))).andAnswer(() -> {
+ storage.write((Storage.MutateWork.NoResult.Quiet) store -> capture.getValue().apply(store));
+ return result;
+ });
+ }
+
+ public static <T> IExpectationSetters<CompletableFuture<T>> expectBatchExecute(
+ BatchWorker<T> batchWorker,
+ Storage storage,
+ IMocksControl control) throws Exception {
+
+ return expectBatchExecute(batchWorker, storage, control, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index f879827..ea0b89a 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -60,6 +60,7 @@ import org.apache.aurora.gen.Range;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
import org.apache.aurora.scheduler.TaskIdGenerator;
import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
import org.apache.aurora.scheduler.base.JobKeys;
@@ -125,6 +126,7 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLED;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.gen.ScheduleStatus.STARTING;
import static org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
import static org.apache.aurora.scheduler.updater.UpdateFactory.UpdateFactoryImpl.expandInstanceIds;
import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertEquals;
@@ -164,7 +166,7 @@ public class JobUpdaterIT extends EasyMockTest {
}
@Before
- public void setUp() {
+ public void setUp() throws Exception {
// Avoid console spam due to stats registered multiple times.
Stats.flush();
ScheduledExecutorService executor = createMock(ScheduledExecutorService.class);
@@ -172,6 +174,7 @@ public class JobUpdaterIT extends EasyMockTest {
driver = createMock(Driver.class);
shutdownCommand = createMock(Command.class);
eventBus = new EventBus();
+ TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class);
Injector injector = Guice.createInjector(
new UpdaterModule(executor),
@@ -195,6 +198,7 @@ public class JobUpdaterIT extends EasyMockTest {
bind(LockManager.class).to(LockManagerImpl.class);
bind(UUIDGenerator.class).to(UUIDGeneratorImpl.class);
bind(Lifecycle.class).toInstance(new Lifecycle(shutdownCommand));
+ bind(TaskEventBatchWorker.class).toInstance(batchWorker);
}
});
updater = injector.getInstance(JobUpdateController.class);
@@ -204,6 +208,7 @@ public class JobUpdaterIT extends EasyMockTest {
stateManager = injector.getInstance(StateManager.class);
eventBus.register(injector.getInstance(JobUpdateEventSubscriber.class));
subscriber = injector.getInstance(JobUpdateEventSubscriber.class);
+ expectBatchExecute(batchWorker, storage, control).anyTimes();
}
@After
[2/3] aurora git commit: Batching writes - Part 2 (of 3): Converting
cron jobs to use BatchWorker.
Posted by ma...@apache.org.
Batching writes - Part 2 (of 3): Converting cron jobs to use BatchWorker.
Reviewed at https://reviews.apache.org/r/51763/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/2cb43d61
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/2cb43d61
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/2cb43d61
Branch: refs/heads/master
Commit: 2cb43d61ecafb79b31d36332ef4713b9857b3c1a
Parents: ebfeb3e
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri Sep 16 14:17:26 2016 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Sep 16 14:17:26 2016 -0700
----------------------------------------------------------------------
.../aurora/common/util/BackoffHelper.java | 8 +
.../aurora/common/util/BackoffHelperTest.java | 7 +
.../scheduler/cron/quartz/AuroraCronJob.java | 239 +++++++++++--------
.../scheduler/cron/quartz/CronModule.java | 25 +-
.../cron/quartz/AuroraCronJobTest.java | 107 ++++++---
.../aurora/scheduler/cron/quartz/CronIT.java | 4 +
6 files changed, 256 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java b/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java
index 8e73dd9..517c0ef 100644
--- a/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java
+++ b/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java
@@ -90,6 +90,14 @@ public class BackoffHelper {
}
/**
+ * Gets {@link BackoffStrategy} instance the BackoffHelper is initialized with.
+ * @return instance of {@link BackoffStrategy} used by BackoffHelper.
+ */
+ public BackoffStrategy getBackoffStrategy() {
+ return backoffStrategy;
+ }
+
+ /**
* Executes the given task using the configured backoff strategy until the task succeeds as
* indicated by returning a non-null value.
*
http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java b/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java
index bc30990..012fbac 100644
--- a/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java
+++ b/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java
@@ -41,6 +41,13 @@ public class BackoffHelperTest extends EasyMockTest {
}
@Test
+ public void testGetBackoffStrategy() {
+ control.replay();
+
+ assertEquals(backoffStrategy, backoffHelper.getBackoffStrategy());
+ }
+
+ @Test
public void testDoUntilSuccess() throws Exception {
ExceptionalSupplier<Boolean, RuntimeException> task =
createMock(new Clazz<ExceptionalSupplier<Boolean, RuntimeException>>() { });
http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
index c07551e..7c8047a 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java
@@ -13,29 +13,37 @@
*/
package org.apache.aurora.scheduler.cron.quartz;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
import java.util.Date;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import javax.inject.Inject;
+import javax.inject.Qualifier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
import org.apache.aurora.common.stats.Stats;
+import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.util.BackoffHelper;
import org.apache.aurora.gen.CronCollisionPolicy;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.BatchWorker.NoResult;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.cron.CronException;
import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
@@ -43,9 +51,14 @@ import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
+import org.quartz.PersistJobDataAfterExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static java.util.Objects.requireNonNull;
import static com.google.common.base.Preconditions.checkState;
@@ -61,7 +74,8 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLING;
* scheduler should therefore be configured with a large number of threads.
*/
@DisallowConcurrentExecution
-class AuroraCronJob implements Job {
+@PersistJobDataAfterExecution
+class AuroraCronJob implements Job, EventSubscriber {
private static final Logger LOG = LoggerFactory.getLogger(AuroraCronJob.class);
private static final AtomicLong CRON_JOB_TRIGGERS = Stats.exportLong("cron_job_triggers");
@@ -69,147 +83,166 @@ class AuroraCronJob implements Job {
private static final AtomicLong CRON_JOB_PARSE_FAILURES =
Stats.exportLong("cron_job_parse_failures");
private static final AtomicLong CRON_JOB_COLLISIONS = Stats.exportLong("cron_job_collisions");
+ private static final AtomicLong CRON_JOB_CONCURRENT_RUNS =
+ Stats.exportLong("cron_job_concurrent_runs");
@VisibleForTesting
static final Optional<String> KILL_AUDIT_MESSAGE = Optional.of("Killed by cronScheduler");
private final ConfigurationManager configurationManager;
- private final Storage storage;
private final StateManager stateManager;
private final BackoffHelper delayedStartBackoff;
+ private final BatchWorker<NoResult> batchWorker;
+ private final Set<IJobKey> killFollowups = Sets.newConcurrentHashSet();
+
+ /**
+ * Annotation for the max cron batch size.
+ */
+ @VisibleForTesting
+ @Qualifier
+ @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+ @interface CronMaxBatchSize { }
+
+ static class CronBatchWorker extends BatchWorker<NoResult> {
+ @Inject
+ CronBatchWorker(
+ Storage storage,
+ StatsProvider statsProvider,
+ @CronMaxBatchSize int maxBatchSize) {
+
+ super(storage, statsProvider, maxBatchSize);
+ }
+
+ @Override
+ protected String serviceName() {
+ return "CronBatchWorker";
+ }
+ }
@Inject
AuroraCronJob(
ConfigurationManager configurationManager,
Config config,
- Storage storage,
- StateManager stateManager) {
+ StateManager stateManager,
+ CronBatchWorker batchWorker) {
this.configurationManager = requireNonNull(configurationManager);
- this.storage = requireNonNull(storage);
this.stateManager = requireNonNull(stateManager);
+ this.batchWorker = requireNonNull(batchWorker);
this.delayedStartBackoff = requireNonNull(config.getDelayedStartBackoff());
}
- private static final class DeferredLaunch {
- private final ITaskConfig task;
- private final Set<Integer> instanceIds;
- private final Set<String> activeTaskIds;
-
- DeferredLaunch(ITaskConfig task, Set<Integer> instanceIds, Set<String> activeTaskIds) {
- this.task = task;
- this.instanceIds = instanceIds;
- this.activeTaskIds = activeTaskIds;
- }
- }
-
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
// We assume quartz prevents concurrent runs of this job for a given job key. This allows us
// to avoid races where we might kill another run's tasks.
checkState(context.getJobDetail().isConcurrentExectionDisallowed());
- doExecute(Quartz.auroraJobKey(context.getJobDetail().getKey()));
+ doExecute(context);
}
@VisibleForTesting
- void doExecute(final IJobKey key) throws JobExecutionException {
+ void doExecute(JobExecutionContext context) throws JobExecutionException {
+ final IJobKey key = Quartz.auroraJobKey(context.getJobDetail().getKey());
final String path = JobKeys.canonicalString(key);
- final Optional<DeferredLaunch> deferredLaunch = storage.write(
- (MutateWork.Quiet<Optional<DeferredLaunch>>) storeProvider -> {
- Optional<IJobConfiguration> config = storeProvider.getCronJobStore().fetchJob(key);
- if (!config.isPresent()) {
- LOG.warn(
- "Cron was triggered for {} but no job with that key was found in storage.",
- path);
- CRON_JOB_MISFIRES.incrementAndGet();
- return Optional.absent();
- }
-
- SanitizedCronJob cronJob;
- try {
- cronJob = SanitizedCronJob.fromUnsanitized(configurationManager, config.get());
- } catch (ConfigurationManager.TaskDescriptionException | CronException e) {
- LOG.warn(
- "Invalid cron job for {} in storage - failed to parse with {}", key, e);
- CRON_JOB_PARSE_FAILURES.incrementAndGet();
- return Optional.absent();
- }
-
- CronCollisionPolicy collisionPolicy = cronJob.getCronCollisionPolicy();
- LOG.info(
- "Cron triggered for {} at {} with policy {}", path, new Date(), collisionPolicy);
- CRON_JOB_TRIGGERS.incrementAndGet();
-
- final Query.Builder activeQuery = Query.jobScoped(key).active();
- Set<String> activeTasks =
- Tasks.ids(storeProvider.getTaskStore().fetchTasks(activeQuery));
+ // Prevent a concurrent run for this job in case a previous trigger took longer to run.
+ // This approach relies on saving the "work in progress" token within the job context itself
+ // (see below) and relying on killFollowups to signal "work completion".
+ if (context.getJobDetail().getJobDataMap().containsKey(path)) {
+ CRON_JOB_CONCURRENT_RUNS.incrementAndGet();
+ if (killFollowups.contains(key)) {
+ context.getJobDetail().getJobDataMap().remove(path);
+ killFollowups.remove(key);
+ LOG.info("Resetting job context for cron " + path);
+ } else {
+ LOG.info("Ignoring trigger as another concurrent run is active for cron " + path);
+ return;
+ }
+ }
- ITaskConfig task = cronJob.getSanitizedConfig().getJobConfig().getTaskConfig();
- Set<Integer> instanceIds = cronJob.getSanitizedConfig().getInstanceIds();
- if (activeTasks.isEmpty()) {
- stateManager.insertPendingTasks(storeProvider, task, instanceIds);
+ CompletableFuture<NoResult> scheduleResult = batchWorker.<NoResult>execute(storeProvider -> {
+ Optional<IJobConfiguration> config = storeProvider.getCronJobStore().fetchJob(key);
+ if (!config.isPresent()) {
+ LOG.warn("Cron was triggered for {} but no job with that key was found in storage.", path);
+ CRON_JOB_MISFIRES.incrementAndGet();
+ return BatchWorker.NO_RESULT;
+ }
- return Optional.absent();
- }
+ SanitizedCronJob cronJob;
+ try {
+ cronJob = SanitizedCronJob.fromUnsanitized(configurationManager, config.get());
+ } catch (ConfigurationManager.TaskDescriptionException | CronException e) {
+ LOG.warn("Invalid cron job for {} in storage - failed to parse with {}", key, e);
+ CRON_JOB_PARSE_FAILURES.incrementAndGet();
+ return BatchWorker.NO_RESULT;
+ }
- CRON_JOB_COLLISIONS.incrementAndGet();
- switch (collisionPolicy) {
- case KILL_EXISTING:
- return Optional.of(new DeferredLaunch(task, instanceIds, activeTasks));
+ CronCollisionPolicy collisionPolicy = cronJob.getCronCollisionPolicy();
+ LOG.info("Cron triggered for {} at {} with policy {}", path, new Date(), collisionPolicy);
+ CRON_JOB_TRIGGERS.incrementAndGet();
- case RUN_OVERLAP:
- LOG.error("Ignoring trigger for job {} with deprecated collision"
- + "policy RUN_OVERLAP due to unterminated active tasks.", path);
- return Optional.absent();
+ final Query.Builder activeQuery = Query.jobScoped(key).active();
+ Set<String> activeTasks = Tasks.ids(storeProvider.getTaskStore().fetchTasks(activeQuery));
- case CANCEL_NEW:
- return Optional.absent();
+ ITaskConfig task = cronJob.getSanitizedConfig().getJobConfig().getTaskConfig();
+ Set<Integer> instanceIds = cronJob.getSanitizedConfig().getInstanceIds();
+ if (activeTasks.isEmpty()) {
+ stateManager.insertPendingTasks(storeProvider, task, instanceIds);
+ return BatchWorker.NO_RESULT;
+ }
- default:
- LOG.error("Unrecognized cron collision policy: " + collisionPolicy);
- return Optional.absent();
+ CRON_JOB_COLLISIONS.incrementAndGet();
+ switch (collisionPolicy) {
+ case KILL_EXISTING:
+ for (String taskId : activeTasks) {
+ stateManager.changeState(
+ storeProvider,
+ taskId,
+ Optional.absent(),
+ KILLING,
+ KILL_AUDIT_MESSAGE);
}
- }
- );
- if (!deferredLaunch.isPresent()) {
- return;
- }
-
- storage.write((NoResult.Quiet) storeProvider -> {
- for (String taskId : deferredLaunch.get().activeTaskIds) {
- stateManager.changeState(
- storeProvider,
- taskId,
- Optional.absent(),
- KILLING,
- KILL_AUDIT_MESSAGE);
+ LOG.info("Waiting for job to terminate before launching cron job " + path);
+ // Use job detail map to signal a "work in progress" condition to subsequent triggers.
+ context.getJobDetail().getJobDataMap().put(path, null);
+ batchWorker.executeWithReplay(
+ delayedStartBackoff.getBackoffStrategy(),
+ store -> {
+ Query.Builder query = Query.taskScoped(activeTasks).active();
+ if (Iterables.isEmpty(storeProvider.getTaskStore().fetchTasks(query))) {
+ LOG.info("Initiating delayed launch of cron " + path);
+ stateManager.insertPendingTasks(store, task, instanceIds);
+ return new BatchWorker.Result<>(true, null);
+ } else {
+ LOG.info("Not yet safe to run cron " + path);
+ return new BatchWorker.Result<>(false, null);
+ }
+ })
+ .thenAccept(ignored -> {
+ killFollowups.add(key);
+ LOG.info("Finished delayed launch for cron " + path);
+ });
+ break;
+
+ case RUN_OVERLAP:
+ LOG.error("Ignoring trigger for job {} with deprecated collision"
+ + "policy RUN_OVERLAP due to unterminated active tasks.", path);
+ break;
+
+ case CANCEL_NEW:
+ break;
+
+ default:
+ LOG.error("Unrecognized cron collision policy: " + collisionPolicy);
}
+ return BatchWorker.NO_RESULT;
});
- LOG.info("Waiting for job to terminate before launching cron job {}.", path);
-
- final Query.Builder query = Query.taskScoped(deferredLaunch.get().activeTaskIds).active();
try {
- // NOTE: We block the quartz execution thread here until we've successfully killed our
- // ancestor. We mitigate this by using a cached thread pool for quartz.
- delayedStartBackoff.doUntilSuccess(() -> {
- if (Iterables.isEmpty(Storage.Util.fetchTasks(storage, query))) {
- LOG.info("Initiating delayed launch of cron " + path);
- storage.write((NoResult.Quiet) storeProvider -> stateManager.insertPendingTasks(
- storeProvider,
- deferredLaunch.get().task,
- deferredLaunch.get().instanceIds));
-
- return true;
- } else {
- LOG.info("Not yet safe to run cron " + path);
- return false;
- }
- });
- } catch (InterruptedException e) {
+ scheduleResult.get();
+ } catch (ExecutionException | InterruptedException e) {
LOG.warn("Interrupted while trying to launch cron " + path, e);
Thread.currentThread().interrupt();
throw new JobExecutionException(e);
http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
index 155d702..9c88a2a 100644
--- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java
@@ -21,16 +21,19 @@ import javax.inject.Singleton;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
+import com.google.inject.TypeLiteral;
import org.apache.aurora.common.args.Arg;
import org.apache.aurora.common.args.CmdLine;
+import org.apache.aurora.common.args.constraints.Positive;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.util.BackoffHelper;
-import org.apache.aurora.scheduler.SchedulerServicesModule;
import org.apache.aurora.scheduler.cron.CronJobManager;
import org.apache.aurora.scheduler.cron.CronPredictor;
import org.apache.aurora.scheduler.cron.CronScheduler;
+import org.apache.aurora.scheduler.cron.quartz.AuroraCronJob.CronBatchWorker;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.impl.StdSchedulerFactory;
@@ -38,6 +41,7 @@ import org.quartz.simpl.SimpleThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.aurora.scheduler.SchedulerServicesModule.addSchedulerActiveServiceBinding;
import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_INSTANCE_ID;
import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_MAKE_SCHEDULER_THREAD_DAEMON;
import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_NAME;
@@ -55,7 +59,7 @@ public class CronModule extends AbstractModule {
@CmdLine(name = "cron_scheduler_num_threads",
help = "Number of threads to use for the cron scheduler thread pool.")
- private static final Arg<Integer> NUM_THREADS = Arg.create(100);
+ private static final Arg<Integer> NUM_THREADS = Arg.create(10);
@CmdLine(name = "cron_timezone", help = "TimeZone to use for cron predictions.")
private static final Arg<String> CRON_TIMEZONE = Arg.create("GMT");
@@ -63,13 +67,18 @@ public class CronModule extends AbstractModule {
@CmdLine(name = "cron_start_initial_backoff", help =
"Initial backoff delay while waiting for a previous cron run to be killed.")
public static final Arg<Amount<Long, Time>> CRON_START_INITIAL_BACKOFF =
- Arg.create(Amount.of(1L, Time.SECONDS));
+ Arg.create(Amount.of(5L, Time.SECONDS));
@CmdLine(name = "cron_start_max_backoff", help =
"Max backoff delay while waiting for a previous cron run to be killed.")
public static final Arg<Amount<Long, Time>> CRON_START_MAX_BACKOFF =
Arg.create(Amount.of(1L, Time.MINUTES));
+ @Positive
+ @CmdLine(name = "cron_scheduling_max_batch_size",
+ help = "The maximum number of triggered cron jobs that can be processed in a batch.")
+ private static final Arg<Integer> CRON_MAX_BATCH_SIZE = Arg.create(10);
+
// Global per-JVM ID number generator for the provided Quartz Scheduler.
private static final AtomicLong ID_GENERATOR = new AtomicLong();
@@ -90,8 +99,16 @@ public class CronModule extends AbstractModule {
bind(AuroraCronJob.Config.class).toInstance(new AuroraCronJob.Config(
new BackoffHelper(CRON_START_INITIAL_BACKOFF.get(), CRON_START_MAX_BACKOFF.get())));
+ PubsubEventModule.bindSubscriber(binder(), AuroraCronJob.class);
+
bind(CronLifecycle.class).in(Singleton.class);
- SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(CronLifecycle.class);
+ addSchedulerActiveServiceBinding(binder()).to(CronLifecycle.class);
+
+ bind(new TypeLiteral<Integer>() { })
+ .annotatedWith(AuroraCronJob.CronMaxBatchSize.class)
+ .toInstance(CRON_MAX_BATCH_SIZE.get());
+ bind(CronBatchWorker.class).in(Singleton.class);
+ addSchedulerActiveServiceBinding(binder()).to(CronBatchWorker.class);
}
@Provides
http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
index 5c64ff2..fb06c28 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java
@@ -13,17 +13,23 @@
*/
package org.apache.aurora.scheduler.cron.quartz;
+import java.util.HashMap;
+import java.util.concurrent.CompletableFuture;
+
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
-import org.apache.aurora.common.base.ExceptionalSupplier;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
import org.apache.aurora.common.util.BackoffHelper;
import org.apache.aurora.gen.AssignedTask;
import org.apache.aurora.gen.CronCollisionPolicy;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.BatchWorker.RepeatableWork;
+import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.TaskTestUtil;
+import org.apache.aurora.scheduler.cron.quartz.AuroraCronJob.CronBatchWorker;
import org.apache.aurora.scheduler.state.StateChangeResult;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage;
@@ -31,11 +37,17 @@ import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.db.DbUtil;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.easymock.Capture;
-import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
+import org.quartz.impl.JobDetailImpl;
+import static org.apache.aurora.scheduler.cron.quartz.QuartzTestUtil.AURORA_JOB_KEY;
+import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
@@ -43,50 +55,61 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class AuroraCronJobTest extends EasyMockTest {
- public static final String TASK_ID = "A";
+ private static final String TASK_ID = "A";
+ private JobDetailImpl jobDetails;
private Storage storage;
private StateManager stateManager;
private BackoffHelper backoffHelper;
-
+ private CronBatchWorker batchWorker;
+ private JobExecutionContext context;
private AuroraCronJob auroraCronJob;
@Before
- public void setUp() {
+ public void setUp() throws Exception {
storage = DbUtil.createStorage();
stateManager = createMock(StateManager.class);
backoffHelper = createMock(BackoffHelper.class);
+ context = createMock(JobExecutionContext.class);
+
+ jobDetails = new JobDetailImpl();
+ jobDetails.setKey(Quartz.jobKey(AURORA_JOB_KEY));
+ jobDetails.setJobDataMap(new JobDataMap(new HashMap()));
+ expect(context.getJobDetail()).andReturn(jobDetails).anyTimes();
+
+ batchWorker = createMock(CronBatchWorker.class);
+ expectBatchExecute(batchWorker, storage, control).anyTimes();
auroraCronJob = new AuroraCronJob(
TaskTestUtil.CONFIGURATION_MANAGER,
- new AuroraCronJob.Config(backoffHelper), storage, stateManager);
+ new AuroraCronJob.Config(backoffHelper),
+ stateManager,
+ batchWorker);
}
@Test
public void testExecuteNonexistentIsNoop() throws JobExecutionException {
control.replay();
- auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+ auroraCronJob.doExecute(context);
}
@Test
public void testEmptyStorage() throws JobExecutionException {
- stateManager.insertPendingTasks(
- EasyMock.anyObject(),
- EasyMock.anyObject(),
- EasyMock.anyObject());
+ stateManager.insertPendingTasks(anyObject(), anyObject(), anyObject());
expectLastCall().times(3);
control.replay();
+
populateStorage(CronCollisionPolicy.CANCEL_NEW);
- auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
- storage = DbUtil.createStorage();
+ auroraCronJob.doExecute(context);
- populateStorage(CronCollisionPolicy.KILL_EXISTING);
- auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
storage = DbUtil.createStorage();
+ populateStorage(CronCollisionPolicy.KILL_EXISTING);
+ auroraCronJob.doExecute(context);
+ storage = DbUtil.createStorage();
populateStorage(CronCollisionPolicy.RUN_OVERLAP);
- auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+ auroraCronJob.doExecute(context);
}
@Test
@@ -95,35 +118,65 @@ public class AuroraCronJobTest extends EasyMockTest {
populateTaskStore();
populateStorage(CronCollisionPolicy.CANCEL_NEW);
- auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
+ auroraCronJob.doExecute(context);
+ }
+
+ @Test
+ public void testOverlap() throws JobExecutionException {
+ control.replay();
+
+ populateTaskStore();
+ populateStorage(CronCollisionPolicy.RUN_OVERLAP);
+ auroraCronJob.doExecute(context);
}
@Test
public void testKillExisting() throws Exception {
- Capture<ExceptionalSupplier<Boolean, RuntimeException>> capture = createCapture();
+ Capture<RepeatableWork<BatchWorker.NoResult>> killCapture = createCapture();
+ CompletableFuture<BatchWorker.NoResult> killResult = new CompletableFuture<>();
+ expect(batchWorker.executeWithReplay(anyObject(), capture(killCapture))).andReturn(killResult);
+ expect(backoffHelper.getBackoffStrategy()).andReturn(null).anyTimes();
expect(stateManager.changeState(
- EasyMock.anyObject(),
+ anyObject(),
eq(TASK_ID),
eq(Optional.absent()),
eq(ScheduleStatus.KILLING),
eq(AuroraCronJob.KILL_AUDIT_MESSAGE)))
.andReturn(StateChangeResult.SUCCESS);
- backoffHelper.doUntilSuccess(EasyMock.capture(capture));
- stateManager.insertPendingTasks(
- EasyMock.anyObject(),
- EasyMock.anyObject(),
- EasyMock.anyObject());
+ stateManager.insertPendingTasks(anyObject(), anyObject(), anyObject());
+ expectLastCall().times(2);
control.replay();
populateStorage(CronCollisionPolicy.KILL_EXISTING);
populateTaskStore();
- auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY);
- assertFalse(capture.getValue().get());
+ auroraCronJob.doExecute(context);
+
storage.write(
(NoResult.Quiet) storeProvider -> storeProvider.getUnsafeTaskStore().deleteAllTasks());
- assertTrue(capture.getValue().get());
+ storage.write((NoResult.Quiet) store -> killCapture.getValue().apply(store));
+
+ // Simulate a trigger in progress.
+ jobDetails.getJobDataMap().put(JobKeys.canonicalString(AURORA_JOB_KEY), null);
+ assertFalse(jobDetails.getJobDataMap().isEmpty());
+
+ // Attempt a concurrent run that must be rejected.
+ auroraCronJob.doExecute(context);
+
+ // Complete previous run and trigger another one.
+ killResult.complete(BatchWorker.NO_RESULT);
+ auroraCronJob.doExecute(context);
+ assertTrue(jobDetails.getJobDataMap().isEmpty());
+ }
+
+ @Test
+ public void testNoConcurrentRun() throws Exception {
+ jobDetails.getJobDataMap().put(JobKeys.canonicalString(AURORA_JOB_KEY), null);
+
+ control.replay();
+
+ auroraCronJob.doExecute(context);
}
private void populateTaskStore() {
http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
index 1c0a3fa..8556253 100644
--- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java
@@ -21,6 +21,7 @@ import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.util.Modules;
+import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
import org.apache.aurora.common.util.Clock;
import org.apache.aurora.gen.Container;
@@ -34,6 +35,7 @@ import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.cron.CronJobManager;
import org.apache.aurora.scheduler.cron.CrontabEntry;
import org.apache.aurora.scheduler.cron.SanitizedCronJob;
+import org.apache.aurora.scheduler.events.EventSink;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
@@ -95,6 +97,8 @@ public class CronIT extends EasyMockTest {
bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK);
bind(StateManager.class).toInstance(stateManager);
bind(Storage.class).toInstance(storage);
+ bind(StatsProvider.class).toInstance(createMock(StatsProvider.class));
+ bind(EventSink.class).toInstance(createMock(EventSink.class));
}
});
}
[3/3] aurora git commit: Batching writes - Part 3 (of 3): Converting
TaskScheduler to use BatchWorker.
Posted by ma...@apache.org.
Batching writes - Part 3 (of 3): Converting TaskScheduler to use BatchWorker.
Reviewed at https://reviews.apache.org/r/51765/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/496397aa
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/496397aa
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/496397aa
Branch: refs/heads/master
Commit: 496397aa5a9534d04bcc273e50a6b9de204ae133
Parents: 2cb43d6
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri Sep 16 14:17:48 2016 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Sep 16 14:17:48 2016 -0700
----------------------------------------------------------------------
.../aurora/benchmark/SchedulingBenchmarks.java | 20 ++++---
.../scheduler/scheduling/SchedulingModule.java | 13 ++++
.../aurora/scheduler/scheduling/TaskGroups.java | 62 ++++++++++++++++++--
.../scheduler/scheduling/TaskScheduler.java | 17 ++----
.../scheduler/http/AbstractJettyTest.java | 2 +
.../scheduler/scheduling/TaskGroupsTest.java | 35 +++++++----
.../scheduling/TaskSchedulerImplTest.java | 19 +++---
7 files changed, 125 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/496397aa/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
index 9d0d40b..6f1cbfb 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java
@@ -190,9 +190,11 @@ public class SchedulingBenchmarks {
private void fillUpCluster(int numOffers) {
Set<IScheduledTask> tasksToAssign = buildClusterTasks(numOffers);
saveTasks(tasksToAssign);
- for (IScheduledTask scheduledTask : tasksToAssign) {
- taskScheduler.schedule(scheduledTask.getAssignedTask().getTaskId());
- }
+ storage.write((NoResult.Quiet) store -> {
+ for (IScheduledTask scheduledTask : tasksToAssign) {
+ taskScheduler.schedule(store, scheduledTask.getAssignedTask().getTaskId());
+ }
+ });
}
private void saveTasks(final Set<IScheduledTask> tasks) {
@@ -219,11 +221,13 @@ public class SchedulingBenchmarks {
*/
@Benchmark
public boolean runBenchmark() {
- boolean result = false;
- for (IScheduledTask task : settings.getTasks()) {
- result = taskScheduler.schedule(task.getAssignedTask().getTaskId());
- }
- return result;
+ return storage.write((Storage.MutateWork.Quiet<Boolean>) store -> {
+ boolean result = false;
+ for (IScheduledTask task : settings.getTasks()) {
+ result = taskScheduler.schedule(store, task.getAssignedTask().getTaskId());
+ }
+ return result;
+ });
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/496397aa/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
index 11e8033..664bc6c 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/SchedulingModule.java
@@ -31,6 +31,8 @@ import org.apache.aurora.scheduler.events.PubsubEventModule;
import org.apache.aurora.scheduler.preemptor.BiCache;
import org.apache.aurora.scheduler.scheduling.RescheduleCalculator.RescheduleCalculatorImpl;
+import static org.apache.aurora.scheduler.SchedulerServicesModule.addSchedulerActiveServiceBinding;
+
/**
* Binding module for task scheduling logic.
*/
@@ -83,6 +85,11 @@ public class SchedulingModule extends AbstractModule {
private static final Arg<Amount<Long, Time>> RESERVATION_DURATION =
Arg.create(Amount.of(3L, Time.MINUTES));
+ @Positive
+ @CmdLine(name = "scheduling_max_batch_size",
+ help = "The maximum number of scheduling attempts that can be processed in a batch.")
+ private static final Arg<Integer> SCHEDULING_MAX_BATCH_SIZE = Arg.create(3);
+
@Override
protected void configure() {
install(new PrivateModule() {
@@ -109,6 +116,12 @@ public class SchedulingModule extends AbstractModule {
});
PubsubEventModule.bindSubscriber(binder(), TaskGroups.class);
+ bind(new TypeLiteral<Integer>() { })
+ .annotatedWith(TaskGroups.SchedulingMaxBatchSize.class)
+ .toInstance(SCHEDULING_MAX_BATCH_SIZE.get());
+ bind(TaskGroups.TaskGroupBatchWorker.class).in(Singleton.class);
+ addSchedulerActiveServiceBinding(binder()).to(TaskGroups.TaskGroupBatchWorker.class);
+
install(new PrivateModule() {
@Override
protected void configure() {
http://git-wip-us.apache.org/repos/asf/aurora/blob/496397aa/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
index c044ebe..d390c07 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java
@@ -13,12 +13,19 @@
*/
package org.apache.aurora.scheduler.scheduling;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import javax.inject.Inject;
+import javax.inject.Qualifier;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
@@ -28,7 +35,9 @@ import com.google.common.util.concurrent.RateLimiter;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.SlidingStats;
+import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.util.BackoffStrategy;
+import org.apache.aurora.scheduler.BatchWorker;
import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.TaskGroupKey;
@@ -36,9 +45,14 @@ import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static java.util.Objects.requireNonNull;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
@@ -61,12 +75,38 @@ public class TaskGroups implements EventSubscriber {
private final long firstScheduleDelay;
private final BackoffStrategy backoff;
private final RescheduleCalculator rescheduleCalculator;
+ private final BatchWorker<Boolean> batchWorker;
// Track the penalties of tasks at the time they were scheduled. This is to provide data that
// may influence the selection of a different backoff strategy.
private final SlidingStats scheduledTaskPenalties =
new SlidingStats("scheduled_task_penalty", "ms");
+ /**
+ * Annotation for the max scheduling batch size.
+ */
+ @VisibleForTesting
+ @Qualifier
+ @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+ public @interface SchedulingMaxBatchSize { }
+
+ @VisibleForTesting
+ public static class TaskGroupBatchWorker extends BatchWorker<Boolean> {
+ @Inject
+ TaskGroupBatchWorker(
+ Storage storage,
+ StatsProvider statsProvider,
+ @SchedulingMaxBatchSize int maxBatchSize) {
+
+ super(storage, statsProvider, maxBatchSize);
+ }
+
+ @Override
+ protected String serviceName() {
+ return "TaskGroupBatchWorker";
+ }
+ }
+
public static class TaskGroupsSettings {
private final Amount<Long, Time> firstScheduleDelay;
private final BackoffStrategy taskGroupBackoff;
@@ -88,7 +128,8 @@ public class TaskGroups implements EventSubscriber {
@AsyncExecutor DelayExecutor executor,
TaskGroupsSettings settings,
TaskScheduler taskScheduler,
- RescheduleCalculator rescheduleCalculator) {
+ RescheduleCalculator rescheduleCalculator,
+ TaskGroupBatchWorker batchWorker) {
requireNonNull(settings.firstScheduleDelay);
Preconditions.checkArgument(settings.firstScheduleDelay.getValue() > 0);
@@ -99,10 +140,11 @@ public class TaskGroups implements EventSubscriber {
this.firstScheduleDelay = settings.firstScheduleDelay.as(Time.MILLISECONDS);
this.backoff = requireNonNull(settings.taskGroupBackoff);
this.rescheduleCalculator = requireNonNull(rescheduleCalculator);
+ this.batchWorker = requireNonNull(batchWorker);
- this.taskScheduler = taskId -> {
+ this.taskScheduler = (store, taskId) -> {
settings.rateLimiter.acquire();
- return taskScheduler.schedule(taskId);
+ return taskScheduler.schedule(store, taskId);
};
}
@@ -120,10 +162,20 @@ public class TaskGroups implements EventSubscriber {
Runnable monitor = new Runnable() {
@Override
public void run() {
- Optional<String> taskId = group.peek();
+ final Optional<String> taskId = group.peek();
long penaltyMs = 0;
if (taskId.isPresent()) {
- if (taskScheduler.schedule(taskId.get())) {
+ CompletableFuture<Boolean> result = batchWorker.execute(storeProvider ->
+ taskScheduler.schedule(storeProvider, taskId.get()));
+ boolean isScheduled = false;
+ try {
+ isScheduled = result.get();
+ } catch (ExecutionException | InterruptedException e) {
+ Thread.currentThread().interrupt();
+ Throwables.propagate(e);
+ }
+
+ if (isScheduled) {
scheduledTaskPenalties.accumulate(group.getPenaltyMs());
group.remove(taskId.get());
if (group.hasMore()) {
http://git-wip-us.apache.org/repos/asf/aurora/blob/496397aa/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
index d266f6a..207d38d 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskScheduler.java
@@ -38,7 +38,6 @@ import org.apache.aurora.scheduler.preemptor.BiCache;
import org.apache.aurora.scheduler.preemptor.Preemptor;
import org.apache.aurora.scheduler.resources.ResourceBag;
import org.apache.aurora.scheduler.state.TaskAssigner;
-import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -63,11 +62,12 @@ public interface TaskScheduler extends EventSubscriber {
/**
* Attempts to schedule a task, possibly performing irreversible actions.
*
+ * @param storeProvider {@code MutableStoreProvider} instance to access data store.
* @param taskId The task to attempt to schedule.
* @return {@code true} if the task was scheduled, {@code false} otherwise. The caller should
* call schedule again if {@code false} is returned.
*/
- boolean schedule(String taskId);
+ boolean schedule(MutableStoreProvider storeProvider, String taskId);
/**
* An asynchronous task scheduler. Scheduling of tasks is performed on a delay, where each task
@@ -86,7 +86,6 @@ public interface TaskScheduler extends EventSubscriber {
private static final Logger LOG = LoggerFactory.getLogger(TaskSchedulerImpl.class);
- private final Storage storage;
private final TaskAssigner assigner;
private final Preemptor preemptor;
private final ExecutorSettings executorSettings;
@@ -98,25 +97,22 @@ public interface TaskScheduler extends EventSubscriber {
@Inject
TaskSchedulerImpl(
- Storage storage,
TaskAssigner assigner,
Preemptor preemptor,
ExecutorSettings executorSettings,
BiCache<String, TaskGroupKey> reservations) {
- this.storage = requireNonNull(storage);
this.assigner = requireNonNull(assigner);
this.preemptor = requireNonNull(preemptor);
this.executorSettings = requireNonNull(executorSettings);
this.reservations = requireNonNull(reservations);
}
- @Timed("task_schedule_attempt")
- @Override
- public boolean schedule(final String taskId) {
+ @Timed ("task_schedule_attempt")
+ public boolean schedule(MutableStoreProvider store, String taskId) {
attemptsFired.incrementAndGet();
try {
- return storage.write(store -> scheduleTask(store, taskId));
+ return scheduleTask(store, taskId);
} catch (RuntimeException e) {
// We catch the generic unchecked exception here to ensure tasks are not abandoned
// if there is a transient issue resulting in an unchecked exception.
@@ -126,8 +122,7 @@ public interface TaskScheduler extends EventSubscriber {
}
}
- @Timed("task_schedule_attempt_locked")
- protected boolean scheduleTask(MutableStoreProvider store, String taskId) {
+ private boolean scheduleTask(MutableStoreProvider store, String taskId) {
LOG.debug("Attempting to schedule task " + taskId);
IAssignedTask assignedTask = Iterables.getOnlyElement(
Iterables.transform(
http://git-wip-us.apache.org/repos/asf/aurora/blob/496397aa/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
index c2ceb4e..c1c3eca 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
@@ -54,6 +54,7 @@ import org.apache.aurora.scheduler.cron.CronJobManager;
import org.apache.aurora.scheduler.http.api.GsonMessageBodyHandler;
import org.apache.aurora.scheduler.offers.OfferManager;
import org.apache.aurora.scheduler.scheduling.RescheduleCalculator;
+import org.apache.aurora.scheduler.scheduling.TaskGroups;
import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupsSettings;
import org.apache.aurora.scheduler.scheduling.TaskScheduler;
import org.apache.aurora.scheduler.state.LockManager;
@@ -128,6 +129,7 @@ public abstract class AbstractJettyTest extends EasyMockTest {
bindMock(TaskScheduler.class);
bindMock(TierManager.class);
bindMock(Thread.UncaughtExceptionHandler.class);
+ bindMock(TaskGroups.TaskGroupBatchWorker.class);
bind(ServletContextListener.class).toProvider(() -> {
return makeServletContextListener(injector, getChildServletModule());
http://git-wip-us.apache.org/repos/asf/aurora/blob/496397aa/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
index 95cf25e..8872962 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskGroupsTest.java
@@ -29,15 +29,20 @@ import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
+import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupBatchWorker;
import org.apache.aurora.scheduler.scheduling.TaskGroups.TaskGroupsSettings;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
import org.junit.Before;
import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
import static org.apache.aurora.gen.ScheduleStatus.INIT;
+import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
public class TaskGroupsTest extends EasyMockTest {
@@ -52,26 +57,33 @@ public class TaskGroupsTest extends EasyMockTest {
private FakeScheduledExecutor clock;
private RescheduleCalculator rescheduleCalculator;
private TaskGroups taskGroups;
+ private TaskGroupBatchWorker batchWorker;
+ private StorageTestUtil storageUtil;
@Before
public void setUp() throws Exception {
+ storageUtil = new StorageTestUtil(this);
+ storageUtil.expectOperations();
DelayExecutor executor = createMock(DelayExecutor.class);
clock = FakeScheduledExecutor.fromDelayExecutor(executor);
backoffStrategy = createMock(BackoffStrategy.class);
taskScheduler = createMock(TaskScheduler.class);
rateLimiter = createMock(RateLimiter.class);
rescheduleCalculator = createMock(RescheduleCalculator.class);
+ batchWorker = createMock(TaskGroupBatchWorker.class);
taskGroups = new TaskGroups(
executor,
new TaskGroupsSettings(FIRST_SCHEDULE_DELAY, backoffStrategy, rateLimiter),
taskScheduler,
- rescheduleCalculator);
+ rescheduleCalculator,
+ batchWorker);
}
@Test
- public void testEvaluatedAfterFirstSchedulePenalty() {
+ public void testEvaluatedAfterFirstSchedulePenalty() throws Exception {
expect(rateLimiter.acquire()).andReturn(0D);
- expect(taskScheduler.schedule(TASK_A_ID)).andReturn(true);
+ expect(taskScheduler.schedule(anyObject(), eq(TASK_A_ID))).andReturn(true);
+ expectBatchExecute(batchWorker, storageUtil.storage, control, true).anyTimes();
control.replay();
@@ -80,10 +92,10 @@ public class TaskGroupsTest extends EasyMockTest {
}
@Test
- public void testTaskDeletedBeforeEvaluating() {
+ public void testTaskDeletedBeforeEvaluating() throws Exception {
final IScheduledTask task = makeTask(TASK_A_ID);
expect(rateLimiter.acquire()).andReturn(0D);
- expect(taskScheduler.schedule(Tasks.id(task))).andAnswer(() -> {
+ expect(taskScheduler.schedule(anyObject(), eq(Tasks.id(task)))).andAnswer(() -> {
// Test a corner case where a task is deleted while it is being evaluated by the task
// scheduler. If not handled carefully, this could result in the scheduler trying again
// later to satisfy the deleted task.
@@ -91,6 +103,7 @@ public class TaskGroupsTest extends EasyMockTest {
return false;
});
+ expectBatchExecute(batchWorker, storageUtil.storage, control, false).anyTimes();
expect(backoffStrategy.calculateBackoffMs(FIRST_SCHEDULE_DELAY.as(Time.MILLISECONDS)))
.andReturn(0L);
@@ -101,10 +114,11 @@ public class TaskGroupsTest extends EasyMockTest {
}
@Test
- public void testEvaluatedOnStartup() {
+ public void testEvaluatedOnStartup() throws Exception {
expect(rateLimiter.acquire()).andReturn(0D);
expect(rescheduleCalculator.getStartupScheduleDelayMs(makeTask(TASK_A_ID))).andReturn(1L);
- expect(taskScheduler.schedule(TASK_A_ID)).andReturn(true);
+ expect(taskScheduler.schedule(anyObject(), eq(TASK_A_ID))).andReturn(true);
+ expectBatchExecute(batchWorker, storageUtil.storage, control, true).anyTimes();
control.replay();
@@ -114,10 +128,11 @@ public class TaskGroupsTest extends EasyMockTest {
}
@Test
- public void testResistStarvation() {
+ public void testResistStarvation() throws Exception {
expect(rateLimiter.acquire()).andReturn(0D).times(2);
- expect(taskScheduler.schedule("a0")).andReturn(true);
- expect(taskScheduler.schedule("b0")).andReturn(true);
+ expect(taskScheduler.schedule(anyObject(), eq("a0"))).andReturn(true);
+ expect(taskScheduler.schedule(anyObject(), eq("b0"))).andReturn(true);
+ expectBatchExecute(batchWorker, storageUtil.storage, control, true).anyTimes();
control.replay();
http://git-wip-us.apache.org/repos/asf/aurora/blob/496397aa/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
index 72562e6..a4e87d2 100644
--- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
@@ -157,7 +157,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
control.replay();
- assertTrue(scheduler.schedule("a"));
+ assertTrue(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
}
@Test
@@ -169,7 +169,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
control.replay();
- assertTrue(scheduler.schedule("a"));
+ assertTrue(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
}
@Test
@@ -201,9 +201,9 @@ public class TaskSchedulerImplTest extends EasyMockTest {
control.replay();
- assertFalse(scheduler.schedule("a"));
- assertFalse(scheduler.schedule("a"));
- assertTrue(scheduler.schedule("a"));
+ assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
+ assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
+ assertTrue(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
}
@Test
@@ -218,7 +218,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
control.replay();
- assertFalse(scheduler.schedule("a"));
+ assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
}
@Test
@@ -233,7 +233,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
control.replay();
- assertFalse(scheduler.schedule("a"));
+ assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
}
@Test
@@ -281,7 +281,8 @@ public class TaskSchedulerImplTest extends EasyMockTest {
control.replay();
- assertTrue(scheduler.schedule(Tasks.id(taskA)));
+ memStorage.write((NoResult.Quiet)
+ store -> assertTrue(scheduler.schedule(store, Tasks.id(taskA))));
}
@Test
@@ -295,7 +296,7 @@ public class TaskSchedulerImplTest extends EasyMockTest {
control.replay();
- assertFalse(scheduler.schedule("a"));
+ assertFalse(scheduler.schedule(storageUtil.mutableStoreProvider, "a"));
}
private void expectPreemptorCall(IScheduledTask task, Optional<String> result) {