You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2014/10/03 20:17:20 UTC
[2/2] git commit: Implementing update history pruner.
Implementing update history pruner.
Bugs closed: AURORA-743
Reviewed at https://reviews.apache.org/r/26232/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/9834b316
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/9834b316
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/9834b316
Branch: refs/heads/master
Commit: 9834b316a6f6d6cbcda6ffef733d8fe0d906464b
Parents: de47aba
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri Oct 3 11:02:17 2014 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Oct 3 11:02:17 2014 -0700
----------------------------------------------------------------------
.../aurora/scheduler/async/AsyncModule.java | 42 +-
.../aurora/scheduler/async/HistoryPruner.java | 169 --------
.../scheduler/async/JobUpdateHistoryPruner.java | 100 +++++
.../scheduler/async/TaskHistoryPruner.java | 169 ++++++++
.../scheduler/storage/JobUpdateStore.java | 14 +
.../scheduler/storage/db/DBJobUpdateStore.java | 21 +
.../storage/db/JobUpdateDetailsMapper.java | 40 +-
.../storage/log/WriteAheadStorage.java | 19 +
.../storage/db/JobUpdateDetailsMapper.xml | 83 +++-
.../thrift/org/apache/aurora/gen/storage.thrift | 6 +
.../scheduler/async/HistoryPrunerTest.java | 397 -------------------
.../async/JobUpdateHistoryPrunerTest.java | 63 +++
.../scheduler/async/TaskHistoryPrunerTest.java | 397 +++++++++++++++++++
.../storage/db/DBJobUpdateStoreTest.java | 158 +++++++-
.../scheduler/storage/log/LogStorageTest.java | 27 ++
.../testing/FakeScheduledExecutor.java | 116 ++++++
.../updater/FakeScheduledExecutor.java | 87 ----
.../aurora/scheduler/updater/JobUpdaterIT.java | 3 +-
18 files changed, 1235 insertions(+), 676 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index aa45d27..c8f7f99 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -31,6 +31,8 @@ import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.PrivateModule;
import com.google.inject.TypeLiteral;
+
+import com.twitter.common.application.modules.LifecycleModule;
import com.twitter.common.args.Arg;
import com.twitter.common.args.CmdLine;
import com.twitter.common.args.constraints.NotNegative;
@@ -45,11 +47,11 @@ import com.twitter.common.util.TruncatedBinaryBackoff;
import org.apache.aurora.scheduler.async.GcExecutorLauncher.GcExecutorSettings;
import org.apache.aurora.scheduler.async.GcExecutorLauncher.RandomGcExecutorSettings;
-import org.apache.aurora.scheduler.async.HistoryPruner.HistoryPrunnerSettings;
import org.apache.aurora.scheduler.async.OfferQueue.OfferQueueImpl;
import org.apache.aurora.scheduler.async.OfferQueue.OfferReturnDelay;
import org.apache.aurora.scheduler.async.RescheduleCalculator.RescheduleCalculatorImpl;
import org.apache.aurora.scheduler.async.TaskGroups.TaskGroupsSettings;
+import org.apache.aurora.scheduler.async.TaskHistoryPruner.HistoryPrunnerSettings;
import org.apache.aurora.scheduler.async.TaskScheduler.TaskSchedulerImpl;
import org.apache.aurora.scheduler.base.AsyncUtil;
import org.apache.aurora.scheduler.events.PubsubEventModule;
@@ -150,6 +152,20 @@ public class AsyncModule extends AbstractModule {
help = "Enable the preemptor and preemption")
private static final Arg<Boolean> ENABLE_PREEMPTOR = Arg.create(true);
+ @CmdLine(name = "job_update_history_per_job_threshold",
+ help = "Maximum number of completed job updates to retain in a job update history.")
+ private static final Arg<Integer> JOB_UPDATE_HISTORY_PER_JOB_THRESHOLD = Arg.create(10);
+
+ @CmdLine(name = "job_update_history_pruning_interval",
+ help = "Job update history pruning interval.")
+ private static final Arg<Amount<Long, Time>> JOB_UPDATE_HISTORY_PRUNING_INTERVAL =
+ Arg.create(Amount.of(15L, Time.MINUTES));
+
+ @CmdLine(name = "job_update_history_pruning_threshold",
+ help = "Time after which the scheduler will prune completed job update history.")
+ private static final Arg<Amount<Long, Time>> JOB_UPDATE_HISTORY_PRUNING_THRESHOLD =
+ Arg.create(Amount.of(30L, Time.DAYS));
+
private static final Preemptor NULL_PREEMPTOR = new Preemptor() {
@Override
public Optional<String> findPreemptionSlotFor(
@@ -271,11 +287,11 @@ public class AsyncModule extends AbstractModule {
));
bind(ScheduledExecutorService.class).toInstance(executor);
- bind(HistoryPruner.class).in(Singleton.class);
- expose(HistoryPruner.class);
+ bind(TaskHistoryPruner.class).in(Singleton.class);
+ expose(TaskHistoryPruner.class);
}
});
- PubsubEventModule.bindSubscriber(binder(), HistoryPruner.class);
+ PubsubEventModule.bindSubscriber(binder(), TaskHistoryPruner.class);
install(new PrivateModule() {
@Override
@@ -299,6 +315,24 @@ public class AsyncModule extends AbstractModule {
expose(GcExecutorLauncher.class);
}
});
+
+ install(new PrivateModule() {
+ @Override
+ protected void configure() {
+ bind(JobUpdateHistoryPruner.HistoryPrunerSettings.class).toInstance(
+ new JobUpdateHistoryPruner.HistoryPrunerSettings(
+ JOB_UPDATE_HISTORY_PRUNING_INTERVAL.get(),
+ JOB_UPDATE_HISTORY_PRUNING_THRESHOLD.get(),
+ JOB_UPDATE_HISTORY_PER_JOB_THRESHOLD.get()));
+
+ bind(ScheduledExecutorService.class).toInstance(
+ AsyncUtil.singleThreadLoggingScheduledExecutor("JobUpdatePruner-%d", LOG));
+
+ bind(JobUpdateHistoryPruner.class).in(Singleton.class);
+ expose(JobUpdateHistoryPruner.class);
+ }
+ });
+ LifecycleModule.bindStartupAction(binder(), JobUpdateHistoryPruner.class);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
deleted file mode 100644
index ebae58a..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
-import javax.inject.Inject;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.eventbus.Subscribe;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.util.Clock;
-
-import org.apache.aurora.gen.apiConstants;
-import org.apache.aurora.scheduler.base.Query;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-
-import static java.util.Objects.requireNonNull;
-
-import static org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
-import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-
-/**
- * Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks
- * transitioning into one of the inactive states.
- */
-public class HistoryPruner implements EventSubscriber {
- private static final Logger LOG = Logger.getLogger(HistoryPruner.class.getName());
-
- private final ScheduledExecutorService executor;
- private final StateManager stateManager;
- private final Clock clock;
- private final HistoryPrunnerSettings settings;
- private final Storage storage;
-
- private final Predicate<IScheduledTask> safeToDelete = new Predicate<IScheduledTask>() {
- @Override
- public boolean apply(IScheduledTask task) {
- return Tasks.getLatestEvent(task).getTimestamp()
- <= clock.nowMillis() - settings.minRetentionThresholdMillis;
- }
- };
-
- static class HistoryPrunnerSettings {
- private final long pruneThresholdMillis;
- private final long minRetentionThresholdMillis;
- private final int perJobHistoryGoal;
-
- HistoryPrunnerSettings(
- Amount<Long, Time> inactivePruneThreshold,
- Amount<Long, Time> minRetentionThreshold,
- int perJobHistoryGoal) {
-
- this.pruneThresholdMillis = inactivePruneThreshold.as(Time.MILLISECONDS);
- this.minRetentionThresholdMillis = minRetentionThreshold.as(Time.MILLISECONDS);
- this.perJobHistoryGoal = perJobHistoryGoal;
- }
- }
-
- @Inject
- HistoryPruner(
- final ScheduledExecutorService executor,
- final StateManager stateManager,
- final Clock clock,
- final HistoryPrunnerSettings settings,
- final Storage storage) {
-
- this.executor = requireNonNull(executor);
- this.stateManager = requireNonNull(stateManager);
- this.clock = requireNonNull(clock);
- this.settings = requireNonNull(settings);
- this.storage = requireNonNull(storage);
- }
-
- @VisibleForTesting
- long calculateTimeout(long taskEventTimestampMillis) {
- return Math.max(
- settings.minRetentionThresholdMillis,
- settings.pruneThresholdMillis - Math.max(0, clock.nowMillis() - taskEventTimestampMillis));
- }
-
- /**
- * When triggered, records an inactive task state change.
- *
- * @param change Event when a task changes state.
- */
- @Subscribe
- public void recordStateChange(TaskStateChange change) {
- if (Tasks.isTerminated(change.getNewState())) {
- long timeoutBasis = change.isTransition()
- ? clock.nowMillis()
- : Iterables.getLast(change.getTask().getTaskEvents()).getTimestamp();
- registerInactiveTask(
- Tasks.SCHEDULED_TO_JOB_KEY.apply(change.getTask()),
- change.getTaskId(),
- calculateTimeout(timeoutBasis));
- }
- }
-
- private void deleteTasks(Set<String> taskIds) {
- LOG.info("Pruning inactive tasks " + taskIds);
- stateManager.deleteTasks(taskIds);
- }
-
- @VisibleForTesting
- static Query.Builder jobHistoryQuery(IJobKey jobKey) {
- return Query.jobScoped(jobKey).byStatus(apiConstants.TERMINAL_STATES);
- }
-
- private void registerInactiveTask(
- final IJobKey jobKey,
- final String taskId,
- long timeRemaining) {
-
- LOG.fine("Prune task " + taskId + " in " + timeRemaining + " ms.");
- executor.schedule(
- new Runnable() {
- @Override
- public void run() {
- LOG.info("Pruning expired inactive task " + taskId);
- deleteTasks(ImmutableSet.of(taskId));
- }
- },
- timeRemaining,
- TimeUnit.MILLISECONDS);
-
- executor.submit(new Runnable() {
- @Override
- public void run() {
- Collection<IScheduledTask> inactiveTasks =
- Storage.Util.weaklyConsistentFetchTasks(storage, jobHistoryQuery(jobKey));
- int tasksToPrune = inactiveTasks.size() - settings.perJobHistoryGoal;
- if (tasksToPrune > 0 && inactiveTasks.size() > settings.perJobHistoryGoal) {
- Set<String> toPrune = FluentIterable
- .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks))
- .filter(safeToDelete)
- .limit(tasksToPrune)
- .transform(Tasks.SCHEDULED_TO_ID)
- .toSet();
- deleteTasks(toPrune);
- }
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
new file mode 100644
index 0000000..0b023a2
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPruner.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.base.Joiner;
+
+import com.twitter.common.base.Command;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Prunes per-job update history on a periodic basis.
+ */
+class JobUpdateHistoryPruner implements Command {
+ private static final Logger LOG = Logger.getLogger(JobUpdateHistoryPruner.class.getName());
+
+ private final Clock clock;
+ private final ScheduledExecutorService executor;
+ private final Storage storage;
+ private final HistoryPrunerSettings settings;
+
+ static class HistoryPrunerSettings {
+ private final Amount<Long, Time> pruneInterval;
+ private final Amount<Long, Time> maxHistorySize;
+ private final int maxUpdatesPerJob;
+
+ HistoryPrunerSettings(
+ Amount<Long, Time> pruneInterval,
+ Amount<Long, Time> maxHistorySize,
+ int maxUpdatesPerJob) {
+
+ this.pruneInterval = requireNonNull(pruneInterval);
+ this.maxHistorySize = requireNonNull(maxHistorySize);
+ this.maxUpdatesPerJob = maxUpdatesPerJob;
+ }
+ }
+
+ @Inject
+ JobUpdateHistoryPruner(
+ Clock clock,
+ ScheduledExecutorService executor,
+ Storage storage,
+ HistoryPrunerSettings settings) {
+
+ this.clock = requireNonNull(clock);
+ this.executor = requireNonNull(executor);
+ this.storage = requireNonNull(storage);
+ this.settings = requireNonNull(settings);
+ }
+
+ @Override
+ public void execute() throws RuntimeException {
+ executor.scheduleAtFixedRate(
+ new Runnable() {
+ @Override
+ public void run() {
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override
+ public void execute(MutableStoreProvider storeProvider) {
+ Set<String> prunedUpdates = storeProvider.getJobUpdateStore().pruneHistory(
+ settings.maxUpdatesPerJob,
+ clock.nowMillis() - settings.maxHistorySize.as(Time.MILLISECONDS));
+
+ LOG.info(prunedUpdates.isEmpty()
+ ? "No job update history to prune."
+ : "Pruned job update history: " + Joiner.on(",").join(prunedUpdates));
+ }
+ });
+ }
+ },
+ settings.pruneInterval.as(Time.MILLISECONDS),
+ settings.pruneInterval.as(Time.MILLISECONDS),
+ TimeUnit.MILLISECONDS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
new file mode 100644
index 0000000..345cd89
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskHistoryPruner.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.eventbus.Subscribe;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.gen.apiConstants;
+import org.apache.aurora.scheduler.base.Query;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+
+import static java.util.Objects.requireNonNull;
+
+import static org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
+import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+
+/**
+ * Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks
+ * transitioning into one of the inactive states.
+ */
+public class TaskHistoryPruner implements EventSubscriber {
+ private static final Logger LOG = Logger.getLogger(TaskHistoryPruner.class.getName());
+
+ private final ScheduledExecutorService executor;
+ private final StateManager stateManager;
+ private final Clock clock;
+ private final HistoryPrunnerSettings settings;
+ private final Storage storage;
+
+ private final Predicate<IScheduledTask> safeToDelete = new Predicate<IScheduledTask>() {
+ @Override
+ public boolean apply(IScheduledTask task) {
+ return Tasks.getLatestEvent(task).getTimestamp()
+ <= clock.nowMillis() - settings.minRetentionThresholdMillis;
+ }
+ };
+
+ static class HistoryPrunnerSettings {
+ private final long pruneThresholdMillis;
+ private final long minRetentionThresholdMillis;
+ private final int perJobHistoryGoal;
+
+ HistoryPrunnerSettings(
+ Amount<Long, Time> inactivePruneThreshold,
+ Amount<Long, Time> minRetentionThreshold,
+ int perJobHistoryGoal) {
+
+ this.pruneThresholdMillis = inactivePruneThreshold.as(Time.MILLISECONDS);
+ this.minRetentionThresholdMillis = minRetentionThreshold.as(Time.MILLISECONDS);
+ this.perJobHistoryGoal = perJobHistoryGoal;
+ }
+ }
+
+ @Inject
+ TaskHistoryPruner(
+ final ScheduledExecutorService executor,
+ final StateManager stateManager,
+ final Clock clock,
+ final HistoryPrunnerSettings settings,
+ final Storage storage) {
+
+ this.executor = requireNonNull(executor);
+ this.stateManager = requireNonNull(stateManager);
+ this.clock = requireNonNull(clock);
+ this.settings = requireNonNull(settings);
+ this.storage = requireNonNull(storage);
+ }
+
+ @VisibleForTesting
+ long calculateTimeout(long taskEventTimestampMillis) {
+ return Math.max(
+ settings.minRetentionThresholdMillis,
+ settings.pruneThresholdMillis - Math.max(0, clock.nowMillis() - taskEventTimestampMillis));
+ }
+
+ /**
+ * When triggered, records an inactive task state change.
+ *
+ * @param change Event when a task changes state.
+ */
+ @Subscribe
+ public void recordStateChange(TaskStateChange change) {
+ if (Tasks.isTerminated(change.getNewState())) {
+ long timeoutBasis = change.isTransition()
+ ? clock.nowMillis()
+ : Iterables.getLast(change.getTask().getTaskEvents()).getTimestamp();
+ registerInactiveTask(
+ Tasks.SCHEDULED_TO_JOB_KEY.apply(change.getTask()),
+ change.getTaskId(),
+ calculateTimeout(timeoutBasis));
+ }
+ }
+
+ private void deleteTasks(Set<String> taskIds) {
+ LOG.info("Pruning inactive tasks " + taskIds);
+ stateManager.deleteTasks(taskIds);
+ }
+
+ @VisibleForTesting
+ static Query.Builder jobHistoryQuery(IJobKey jobKey) {
+ return Query.jobScoped(jobKey).byStatus(apiConstants.TERMINAL_STATES);
+ }
+
+ private void registerInactiveTask(
+ final IJobKey jobKey,
+ final String taskId,
+ long timeRemaining) {
+
+ LOG.fine("Prune task " + taskId + " in " + timeRemaining + " ms.");
+ executor.schedule(
+ new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("Pruning expired inactive task " + taskId);
+ deleteTasks(ImmutableSet.of(taskId));
+ }
+ },
+ timeRemaining,
+ TimeUnit.MILLISECONDS);
+
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ Collection<IScheduledTask> inactiveTasks =
+ Storage.Util.weaklyConsistentFetchTasks(storage, jobHistoryQuery(jobKey));
+ int tasksToPrune = inactiveTasks.size() - settings.perJobHistoryGoal;
+ if (tasksToPrune > 0 && inactiveTasks.size() > settings.perJobHistoryGoal) {
+ Set<String> toPrune = FluentIterable
+ .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks))
+ .filter(safeToDelete)
+ .limit(tasksToPrune)
+ .transform(Tasks.SCHEDULED_TO_ID)
+ .toSet();
+ deleteTasks(toPrune);
+ }
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
index c3abffe..b7d8d52 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/JobUpdateStore.java
@@ -133,5 +133,19 @@ public interface JobUpdateStore {
* Deletes all updates and update events from the store.
*/
void deleteAllUpdatesAndEvents();
+
+ /**
+ * Prunes (deletes) old completed updates and events from the store.
+ * <p>
+ * At least {@code perJobRetainCount} last completed updates that completed less than
+ * {@code historyPruneThreshold} ago will be kept for every job.
+ *
+ * @param perJobRetainCount Number of completed updates to retain per job.
+ * @param historyPruneThresholdMs Earliest timestamp in the past to retain history.
+ * Any completed updates created before this timestamp
+ * will be pruned.
+ * @return Set of pruned update IDs.
+ */
+ Set<String> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
index 3db0114..d479d20 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStore.java
@@ -135,6 +135,27 @@ public class DBJobUpdateStore implements JobUpdateStore.Mutable {
detailsMapper.truncate();
}
+ @Timed("job_update_store_prune_history")
+ @Override
+ public Set<String> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs) {
+ ImmutableSet.Builder<String> pruned = ImmutableSet.builder();
+
+ Set<Long> jobKeyIdsToPrune = detailsMapper.selectJobKeysForPruning(
+ perJobRetainCount,
+ historyPruneThresholdMs);
+
+ for (Long jobKeyId : jobKeyIdsToPrune) {
+ Set<String> pruneVictims = detailsMapper.selectPruneVictims(
+ jobKeyId,
+ perJobRetainCount,
+ historyPruneThresholdMs);
+
+ detailsMapper.deleteCompletedUpdates(pruneVictims);
+ pruned.addAll(pruneVictims);
+ }
+ return pruned.build();
+ }
+
@Timed("job_update_store_fetch_summaries")
@Override
public List<IJobUpdateSummary> fetchJobUpdateSummaries(IJobUpdateQuery query) {
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java
index c583e08..60f5359 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.java
@@ -104,6 +104,40 @@ interface JobUpdateDetailsMapper {
void truncate();
/**
+ * Deletes all updates and events with update ID in {@code updateIds}.
+ *
+ * @param updateIds Update IDs to delete.
+ */
+ void deleteCompletedUpdates(@Param("updateIds") Set<String> updateIds);
+
+ /**
+ * Selects all distinct job key IDs associated with at least {@code perJobRetainCount} completed
+ * updates or updates completed before {@code historyPruneThresholdMs}.
+ *
+ * @param perJobRetainCount Number of updates to keep per job.
+ * @param historyPruneThresholdMs History pruning timestamp threshold.
+ * @return Job key IDs.
+ */
+ Set<Long> selectJobKeysForPruning(
+ @Param("retainCount") int perJobRetainCount,
+ @Param("pruneThresholdMs") long historyPruneThresholdMs);
+
+ /**
+ * Groups all updates without a job lock in reverse chronological order of their created times
+ * and deletes anything in excess of {@code perJobRetainCount} or older than
+ * {@code historyPruneThresholdMs}.
+ *
+ * @param jobKeyId Job key ID to select pruning victims for.
+ * @param perJobRetainCount Number of updates to keep per job.
+ * @param historyPruneThresholdMs History pruning timestamp threshold.
+ * @return Update IDs to prune.
+ */
+ Set<String> selectPruneVictims(
+ @Param("keyId") long jobKeyId,
+ @Param("retainCount") int perJobRetainCount,
+ @Param("pruneThresholdMs") long historyPruneThresholdMs);
+
+ /**
* Gets all job update summaries matching the provided {@code query}.
* All {@code query} fields are ANDed together.
*
@@ -116,7 +150,7 @@ interface JobUpdateDetailsMapper {
* Gets details for the provided {@code updateId}.
*
* @param updateId Update ID to get.
- * @return job update details for the provided update ID, if it exists.
+ * @return Job update details for the provided update ID, if it exists.
*/
@Nullable
StoredJobUpdateDetails selectDetails(String updateId);
@@ -125,7 +159,7 @@ interface JobUpdateDetailsMapper {
* Gets job update for the provided {@code updateId}.
*
* @param updateId Update ID to select by.
- * @return job update for the provided update ID, if it exists.
+ * @return Job update for the provided update ID, if it exists.
*/
@Nullable
JobUpdate selectUpdate(String updateId);
@@ -134,7 +168,7 @@ interface JobUpdateDetailsMapper {
* Gets job update instructions for the provided {@code updateId}.
*
* @param updateId Update ID to select by.
- * @return job update instructions for the provided update ID, if it exists.
+ * @return Job update instructions for the provided update ID, if it exists.
*/
@Nullable
JobUpdateInstructions selectInstructions(String updateId);
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
index 66c9164..094d1c6 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java
@@ -28,6 +28,7 @@ import com.twitter.common.inject.TimedInterceptor.Timed;
import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.PruneJobUpdateHistory;
import org.apache.aurora.gen.storage.RemoveJob;
import org.apache.aurora.gen.storage.RemoveLock;
import org.apache.aurora.gen.storage.RemoveQuota;
@@ -308,6 +309,24 @@ class WriteAheadStorage extends ForwardingStore implements
}
@Override
+ public Set<String> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs) {
+ Set<String> prunedUpdates = jobUpdateStore.pruneHistory(
+ perJobRetainCount,
+ historyPruneThresholdMs);
+
+ if (!prunedUpdates.isEmpty()) {
+ // Pruned updates will eventually go away from persisted storage when a new snapshot is cut.
+ // So, persisting pruning attempts is not strictly necessary as the periodic pruner will
+ // provide eventual consistency between volatile and persistent storage upon scheduler
+ // restart. By generating an out of band pruning during log replay the consistency is
+ // achieved sooner without potentially exposing pruned but not yet persisted data.
+ write(Op.pruneJobUpdateHistory(
+ new PruneJobUpdateHistory(perJobRetainCount, historyPruneThresholdMs)));
+ }
+ return prunedUpdates;
+ }
+
+ @Override
public void deleteAllTasks() {
throw new UnsupportedOperationException(
"Unsupported since casual storage users should never be doing this.");
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
----------------------------------------------------------------------
diff --git a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
index 2a81a94..77032d8 100644
--- a/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
+++ b/src/main/resources/org/apache/aurora/scheduler/storage/db/JobUpdateDetailsMapper.xml
@@ -199,7 +199,7 @@
</collection>
</resultMap>
- <sql id="timestamps_inner_joins">
+ <sql id="status_inner_join">
INNER JOIN
(
SELECT
@@ -215,6 +215,9 @@
GROUP BY update_id
) AS e_t ON e_t.update_id = e_s.update_id AND e_t.timestamp_ms = e_s.timestamp_ms
) AS max_status ON max_status.update_id = u.id
+ </sql>
+
+ <sql id="created_timestamp_inner_join">
INNER JOIN
(
SELECT
@@ -223,6 +226,9 @@
FROM job_update_events
GROUP BY update_id
) AS min_ts ON min_ts.update_id = u.id
+ </sql>
+
+ <sql id="last_updated_timestamp_inner_join">
INNER JOIN
(
SELECT
@@ -244,6 +250,12 @@
) AS max_ts ON max_ts.update_id = u.id
</sql>
+ <sql id="timestamps_inner_joins">
+ <include refid="status_inner_join" />
+ <include refid="created_timestamp_inner_join" />
+ <include refid="last_updated_timestamp_inner_join" />
+ </sql>
+
<select id="selectSummaries" resultMap="jobUpdateSummaryMap">
SELECT
u.update_id AS update_id,
@@ -320,8 +332,7 @@
io.last AS jui_juse_r_last
</sql>
- <sql id="job_update_inner_joins">
- FROM job_updates AS u
+ <sql id="job_key_inner_join">
INNER JOIN job_keys AS j ON j.id = u.job_key_id
</sql>
@@ -333,6 +344,10 @@
LEFT OUTER JOIN job_updates_to_instance_overrides AS io ON io.update_id = u.id
</sql>
+ <sql id="lock_outer_join">
+ LEFT OUTER JOIN job_update_locks AS l on l.update_id = u.id
+ </sql>
+
<sql id="unscoped_details_select">
SELECT
<include refid="job_update_columns" />,
@@ -345,12 +360,13 @@
i.instance_id AS i_instance_id,
i.timestamp_ms AS i_timestamp_ms,
l.lock_token AS lock_token
- <include refid="job_update_inner_joins" />
+ FROM job_updates AS u
+ <include refid="job_key_inner_join" />
<include refid="timestamps_inner_joins" />
<include refid="job_update_outer_joins" />
LEFT OUTER JOIN job_update_events AS e ON e.update_id = u.id
LEFT OUTER JOIN job_instance_update_events AS i ON i.update_id = u.id
- LEFT OUTER JOIN job_update_locks AS l on l.update_id = u.id
+ <include refid="lock_outer_join" />
</sql>
<!--Ideally, update instruction columns could be derived from job_update_columns above but that
@@ -380,7 +396,8 @@
io.id AS juse_r_id,
io.first AS juse_r_first,
io.last AS juse_r_last
- <include refid="job_update_inner_joins" />
+ FROM job_updates AS u
+ <include refid="job_key_inner_join" />
<include refid="job_update_outer_joins" />
WHERE u.update_id = #{id}
</select>
@@ -388,7 +405,8 @@
<select id="selectUpdate" resultMap="jobUpdateMap">
SELECT
<include refid="job_update_columns" />
- <include refid="job_update_inner_joins" />
+ FROM job_updates AS u
+ <include refid="job_key_inner_join" />
<include refid="timestamps_inner_joins" />
<include refid="job_update_outer_joins" />
WHERE u.update_id = #{id}
@@ -427,4 +445,55 @@
<delete id="truncate">
DELETE FROM job_updates;
</delete>
+
+ <select id="selectJobKeysForPruning" resultType="long">
+ SELECT DISTINCT
+ u.job_key_id
+ FROM job_updates as u
+ <include refid="created_timestamp_inner_join" />
+ <include refid="lock_outer_join" />
+ WHERE l.id IS NULL
+ GROUP BY u.job_key_id
+ HAVING COUNT(u.job_key_id) > #{retainCount}
+ UNION
+ SELECT DISTINCT
+ u.job_key_id
+ FROM job_updates as u
+ <include refid="created_timestamp_inner_join" />
+ <include refid="lock_outer_join" />
+ WHERE min_ts.timestamp_ms < #{pruneThresholdMs} AND l.id IS NULL
+ </select>
+
+ <select id="selectPruneVictims" resultType="String">
+ SELECT id FROM
+ (
+ SELECT
+ u.update_id as id
+ FROM job_updates as u
+ <include refid="created_timestamp_inner_join" />
+ <include refid="lock_outer_join" />
+ WHERE u.job_key_id = #{keyId}
+ AND l.id IS NULL
+ ORDER BY min_ts.timestamp_ms DESC
+ LIMIT NULL
+ OFFSET #{retainCount}
+ )
+ UNION
+ SELECT
+ u.update_id as id
+ FROM job_updates as u
+ <include refid="created_timestamp_inner_join" />
+ <include refid="lock_outer_join" />
+ WHERE u.job_key_id = #{keyId}
+ AND min_ts.timestamp_ms <= #{pruneThresholdMs}
+ AND l.id IS NULL
+ </select>
+
+ <delete id="deleteCompletedUpdates">
+ DELETE FROM job_updates
+ WHERE update_id IN
+ <foreach item="element" collection="updateIds" open="(" separator="," close=")">
+ #{element}
+ </foreach>
+ </delete>
</mapper>
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/main/thrift/org/apache/aurora/gen/storage.thrift
----------------------------------------------------------------------
diff --git a/src/main/thrift/org/apache/aurora/gen/storage.thrift b/src/main/thrift/org/apache/aurora/gen/storage.thrift
index 7e50245..5350ec9 100644
--- a/src/main/thrift/org/apache/aurora/gen/storage.thrift
+++ b/src/main/thrift/org/apache/aurora/gen/storage.thrift
@@ -89,6 +89,11 @@ struct SaveJobInstanceUpdateEvent {
2: string updateId
}
+struct PruneJobUpdateHistory {
+ 1: i32 perJobRetainCount
+ 2: i64 historyPruneThresholdMs
+}
+
union Op {
1: SaveFrameworkId saveFrameworkId
2: SaveAcceptedJob saveAcceptedJob
@@ -104,6 +109,7 @@ union Op {
14: SaveJobUpdate saveJobUpdate
15: SaveJobUpdateEvent saveJobUpdateEvent
16: SaveJobInstanceUpdateEvent saveJobInstanceUpdateEvent
+ 17: PruneJobUpdateHistory pruneJobUpdateHistory
}
// The current schema version ID. This should be incremented each time the
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
deleted file mode 100644
index 011d9ec..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
+++ /dev/null
@@ -1,397 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.common.base.Command;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.testing.easymock.EasyMockTest;
-import com.twitter.common.util.testing.FakeClock;
-
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.ExecutorConfig;
-import org.apache.aurora.gen.Identity;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.async.HistoryPruner.HistoryPrunnerSettings;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
-import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
-import static org.apache.aurora.gen.ScheduleStatus.KILLED;
-import static org.apache.aurora.gen.ScheduleStatus.LOST;
-import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
-import static org.apache.aurora.gen.ScheduleStatus.SANDBOX_DELETED;
-import static org.apache.aurora.gen.ScheduleStatus.STARTING;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.fail;
-
-public class HistoryPrunerTest extends EasyMockTest {
- private static final String JOB_A = "job-a";
- private static final String TASK_ID = "task_id";
- private static final String SLAVE_HOST = "HOST_A";
- private static final Amount<Long, Time> ONE_MS = Amount.of(1L, Time.MILLISECONDS);
- private static final Amount<Long, Time> ONE_MINUTE = Amount.of(1L, Time.MINUTES);
- private static final Amount<Long, Time> ONE_DAY = Amount.of(1L, Time.DAYS);
- private static final Amount<Long, Time> ONE_HOUR = Amount.of(1L, Time.HOURS);
- private static final int PER_JOB_HISTORY = 2;
-
- private ScheduledFuture<?> future;
- private ScheduledExecutorService executor;
- private FakeClock clock;
- private StateManager stateManager;
- private StorageTestUtil storageUtil;
- private HistoryPruner pruner;
-
- @Before
- public void setUp() {
- future = createMock(new Clazz<ScheduledFuture<?>>() { });
- executor = createMock(ScheduledExecutorService.class);
- clock = new FakeClock();
- stateManager = createMock(StateManager.class);
- storageUtil = new StorageTestUtil(this);
- storageUtil.expectOperations();
- pruner = new HistoryPruner(
- executor,
- stateManager,
- clock,
- new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY),
- storageUtil.storage);
- }
-
- @Test
- public void testNoPruning() {
- long taskATimestamp = clock.nowMillis();
- IScheduledTask a = makeTask("a", FINISHED);
-
- clock.advance(ONE_MS);
- long taskBTimestamp = clock.nowMillis();
- IScheduledTask b = makeTask("b", SANDBOX_DELETED);
-
- expectNoImmediatePrune(ImmutableSet.of(a));
- expectOneDelayedPrune(taskATimestamp);
- expectNoImmediatePrune(ImmutableSet.of(a, b));
- expectOneDelayedPrune(taskBTimestamp);
-
- control.replay();
-
- pruner.recordStateChange(TaskStateChange.initialized(a));
- pruner.recordStateChange(TaskStateChange.initialized(b));
- }
-
- @Test
- public void testStorageStartedWithPruning() {
- long taskATimestamp = clock.nowMillis();
- IScheduledTask a = makeTask("a", SANDBOX_DELETED);
-
- clock.advance(ONE_MINUTE);
- long taskBTimestamp = clock.nowMillis();
- IScheduledTask b = makeTask("b", LOST);
-
- clock.advance(ONE_MINUTE);
- long taskCTimestamp = clock.nowMillis();
- IScheduledTask c = makeTask("c", FINISHED);
-
- clock.advance(ONE_MINUTE);
- IScheduledTask d = makeTask("d", FINISHED);
- IScheduledTask e = makeTask("job-x", "e", FINISHED);
-
- expectNoImmediatePrune(ImmutableSet.of(a));
- expectOneDelayedPrune(taskATimestamp);
- expectNoImmediatePrune(ImmutableSet.of(a, b));
- expectOneDelayedPrune(taskBTimestamp);
- expectImmediatePrune(ImmutableSet.of(a, b, c), a);
- expectOneDelayedPrune(taskCTimestamp);
- expectImmediatePrune(ImmutableSet.of(b, c, d), b);
- expectDefaultDelayedPrune();
- expectNoImmediatePrune(ImmutableSet.of(e));
- expectDefaultDelayedPrune();
-
- control.replay();
-
- for (IScheduledTask task : ImmutableList.of(a, b, c, d, e)) {
- pruner.recordStateChange(TaskStateChange.initialized(task));
- }
- }
-
- @Test
- public void testStateChange() {
- IScheduledTask starting = makeTask("a", STARTING);
- IScheduledTask running = copy(starting, RUNNING);
- IScheduledTask killed = copy(starting, KILLED);
-
- expectNoImmediatePrune(ImmutableSet.of(killed));
- expectDefaultDelayedPrune();
-
- control.replay();
-
- // No future set for non-terminal state transition.
- changeState(starting, running);
-
- // Future set for terminal state transition.
- changeState(running, killed);
- }
-
- @Test
- public void testActivateFutureAndExceedHistoryGoal() {
- IScheduledTask running = makeTask("a", RUNNING);
- IScheduledTask killed = copy(running, KILLED);
- expectNoImmediatePrune(ImmutableSet.of(running));
- Capture<Runnable> delayedDelete = expectDefaultDelayedPrune();
-
- // Expect task "a" to be pruned when future is activated.
- expectDeleteTasks("a");
-
- control.replay();
-
- // Capture future for inactive task "a"
- changeState(running, killed);
- clock.advance(ONE_HOUR);
- // Execute future to prune task "a" from the system.
- delayedDelete.getValue().run();
- }
-
- @Test
- public void testJobHistoryExceeded() {
- IScheduledTask a = makeTask("a", RUNNING);
- clock.advance(ONE_MS);
- IScheduledTask aKilled = copy(a, KILLED);
-
- IScheduledTask b = makeTask("b", RUNNING);
- clock.advance(ONE_MS);
- IScheduledTask bKilled = copy(b, KILLED);
-
- IScheduledTask c = makeTask("c", RUNNING);
- clock.advance(ONE_MS);
- IScheduledTask cLost = copy(c, LOST);
-
- IScheduledTask d = makeTask("d", RUNNING);
- clock.advance(ONE_MS);
- IScheduledTask dLost = copy(d, LOST);
-
- expectNoImmediatePrune(ImmutableSet.of(a));
- expectDefaultDelayedPrune();
- expectNoImmediatePrune(ImmutableSet.of(a, b));
- expectDefaultDelayedPrune();
- expectNoImmediatePrune(ImmutableSet.of(a, b)); // no pruning yet due to min threshold
- expectDefaultDelayedPrune();
- clock.advance(ONE_HOUR);
- expectImmediatePrune(ImmutableSet.of(a, b, c, d), a, b); // now prune 2 tasks
- expectDefaultDelayedPrune();
-
- control.replay();
-
- changeState(a, aKilled);
- changeState(b, bKilled);
- changeState(c, cLost);
- changeState(d, dLost);
- }
-
- // TODO(William Farner): Consider removing the thread safety tests. Now that intrinsic locks
- // are not used, it is rather awkward to test this.
- @Test
- public void testThreadSafeStateChangeEvent() throws Exception {
- // This tests against regression where an executor pruning a task holds an intrinsic lock and
- // an unrelated task state change in the scheduler fires an event that requires this intrinsic
- // lock. This causes a deadlock when the executor tries to acquire a lock held by the event
- // fired.
-
- pruner = prunerWithRealExecutor();
- Command onDeleted = new Command() {
- @Override
- public void execute() {
- // The goal is to verify that the call does not deadlock. We do not care about the outcome.
- IScheduledTask b = makeTask("b", ASSIGNED);
-
- changeState(b, STARTING);
- }
- };
- CountDownLatch taskDeleted = expectTaskDeleted(onDeleted, TASK_ID);
-
- control.replay();
-
- // Change the task to a terminal state and wait for it to be pruned.
- changeState(makeTask(TASK_ID, RUNNING), KILLED);
- taskDeleted.await();
- }
-
- private HistoryPruner prunerWithRealExecutor() {
- ScheduledExecutorService realExecutor = Executors.newScheduledThreadPool(1,
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("testThreadSafeEvents-executor")
- .build());
- return new HistoryPruner(
- realExecutor,
- stateManager,
- clock,
- new HistoryPrunnerSettings(Amount.of(1L, Time.MILLISECONDS), ONE_MS, PER_JOB_HISTORY),
- storageUtil.storage);
- }
-
- private CountDownLatch expectTaskDeleted(final Command onDelete, String taskId) {
- final CountDownLatch deleteCalled = new CountDownLatch(1);
- final CountDownLatch eventDelivered = new CountDownLatch(1);
-
- Thread eventDispatch = new Thread() {
- @Override
- public void run() {
- try {
- deleteCalled.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- fail("Interrupted while awaiting for delete call.");
- return;
- }
- onDelete.execute();
- eventDelivered.countDown();
- }
- };
- eventDispatch.setDaemon(true);
- eventDispatch.setName(getClass().getName() + "-EventDispatch");
- eventDispatch.start();
-
- stateManager.deleteTasks(ImmutableSet.of(taskId));
- expectLastCall().andAnswer(new IAnswer<Void>() {
- @Override
- public Void answer() {
- deleteCalled.countDown();
- try {
- eventDelivered.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- fail("Interrupted while awaiting for event delivery.");
- }
- return null;
- }
- });
-
- return eventDelivered;
- }
-
- private void expectDeleteTasks(String... tasks) {
- stateManager.deleteTasks(ImmutableSet.copyOf(tasks));
- }
-
- private Capture<Runnable> expectDefaultDelayedPrune() {
- return expectDelayedPrune(ONE_DAY.as(Time.MILLISECONDS), 1);
- }
-
- private Capture<Runnable> expectOneDelayedPrune(long timestampMillis) {
- return expectDelayedPrune(timestampMillis, 1);
- }
-
- private void expectNoImmediatePrune(ImmutableSet<IScheduledTask> tasksInJob) {
- expectImmediatePrune(tasksInJob);
- }
-
- private void expectImmediatePrune(
- ImmutableSet<IScheduledTask> tasksInJob,
- IScheduledTask... pruned) {
-
- // Expect a deferred prune operation when a new task is being watched.
- executor.submit(EasyMock.<Runnable>anyObject());
- expectLastCall().andAnswer(
- new IAnswer<Future<?>>() {
- @Override
- public Future<?> answer() {
- Runnable work = (Runnable) EasyMock.getCurrentArguments()[0];
- work.run();
- return null;
- }
- }
- );
-
- IJobKey jobKey = Iterables.getOnlyElement(
- FluentIterable.from(tasksInJob).transform(Tasks.SCHEDULED_TO_JOB_KEY).toSet());
- storageUtil.expectTaskFetch(HistoryPruner.jobHistoryQuery(jobKey), tasksInJob);
- if (pruned.length > 0) {
- stateManager.deleteTasks(Tasks.ids(pruned));
- }
- }
-
- private Capture<Runnable> expectDelayedPrune(long timestampMillis, int count) {
- Capture<Runnable> capture = createCapture();
- executor.schedule(
- EasyMock.capture(capture),
- eq(pruner.calculateTimeout(timestampMillis)),
- eq(TimeUnit.MILLISECONDS));
- expectLastCall().andReturn(future).times(count);
- return capture;
- }
-
- private void changeState(IScheduledTask oldStateTask, IScheduledTask newStateTask) {
- pruner.recordStateChange(TaskStateChange.transition(newStateTask, oldStateTask.getStatus()));
- }
-
- private void changeState(IScheduledTask oldStateTask, ScheduleStatus status) {
- pruner.recordStateChange(
- TaskStateChange.transition(copy(oldStateTask, status), oldStateTask.getStatus()));
- }
-
- private IScheduledTask copy(IScheduledTask task, ScheduleStatus status) {
- return IScheduledTask.build(task.newBuilder().setStatus(status));
- }
-
- private IScheduledTask makeTask(
- String job,
- String taskId,
- ScheduleStatus status) {
-
- return IScheduledTask.build(new ScheduledTask()
- .setStatus(status)
- .setTaskEvents(ImmutableList.of(new TaskEvent(clock.nowMillis(), status)))
- .setAssignedTask(makeAssignedTask(job, taskId)));
- }
-
- private IScheduledTask makeTask(String taskId, ScheduleStatus status) {
- return makeTask(JOB_A, taskId, status);
- }
-
- private AssignedTask makeAssignedTask(String job, String taskId) {
- return new AssignedTask()
- .setSlaveHost(SLAVE_HOST)
- .setTaskId(taskId)
- .setTask(new TaskConfig()
- .setOwner(new Identity().setRole("role").setUser("user"))
- .setEnvironment("staging45")
- .setJobName(job)
- .setExecutorConfig(new ExecutorConfig("aurora", "config")));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
new file mode 100644
index 0000000..748aac8
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/JobUpdateHistoryPrunerTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.Clock;
+
+import org.apache.aurora.scheduler.async.JobUpdateHistoryPruner.HistoryPrunerSettings;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeScheduledExecutor;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+
+public class JobUpdateHistoryPrunerTest extends EasyMockTest {
+ @Test
+ public void testExecution() throws Exception {
+ StorageTestUtil storageUtil = new StorageTestUtil(this);
+ storageUtil.expectOperations();
+
+ final ScheduledExecutorService executor = createMock(ScheduledExecutorService.class);
+ FakeScheduledExecutor executorClock =
+ FakeScheduledExecutor.scheduleAtFixedRateExecutor(executor, 2);
+
+ Clock mockClock = createMock(Clock.class);
+ expect(mockClock.nowMillis()).andReturn(2L).times(2);
+
+ expect(storageUtil.jobUpdateStore.pruneHistory(1, 1)).andReturn(ImmutableSet.of("id1", "id2"));
+ expect(storageUtil.jobUpdateStore.pruneHistory(1, 1)).andReturn(ImmutableSet.<String>of());
+
+ control.replay();
+
+ executorClock.assertEmpty();
+ JobUpdateHistoryPruner pruner = new JobUpdateHistoryPruner(
+ mockClock,
+ executor,
+ storageUtil.storage,
+ new HistoryPrunerSettings(
+ Amount.of(1L, Time.MILLISECONDS),
+ Amount.of(1L, Time.MILLISECONDS),
+ 1));
+
+ pruner.execute();
+ executorClock.advance(Amount.of(1L, Time.MILLISECONDS));
+ executorClock.advance(Amount.of(1L, Time.MILLISECONDS));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java
new file mode 100644
index 0000000..53d2c6b
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskHistoryPrunerTest.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.twitter.common.base.Command;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.ExecutorConfig;
+import org.apache.aurora.gen.Identity;
+import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.async.TaskHistoryPruner.HistoryPrunnerSettings;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
+import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
+import static org.apache.aurora.gen.ScheduleStatus.KILLED;
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
+import static org.apache.aurora.gen.ScheduleStatus.SANDBOX_DELETED;
+import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.fail;
+
+public class TaskHistoryPrunerTest extends EasyMockTest {
+ private static final String JOB_A = "job-a";
+ private static final String TASK_ID = "task_id";
+ private static final String SLAVE_HOST = "HOST_A";
+ private static final Amount<Long, Time> ONE_MS = Amount.of(1L, Time.MILLISECONDS);
+ private static final Amount<Long, Time> ONE_MINUTE = Amount.of(1L, Time.MINUTES);
+ private static final Amount<Long, Time> ONE_DAY = Amount.of(1L, Time.DAYS);
+ private static final Amount<Long, Time> ONE_HOUR = Amount.of(1L, Time.HOURS);
+ private static final int PER_JOB_HISTORY = 2;
+
+ private ScheduledFuture<?> future;
+ private ScheduledExecutorService executor;
+ private FakeClock clock;
+ private StateManager stateManager;
+ private StorageTestUtil storageUtil;
+ private TaskHistoryPruner pruner;
+
+ @Before
+ public void setUp() {
+ future = createMock(new Clazz<ScheduledFuture<?>>() { });
+ executor = createMock(ScheduledExecutorService.class);
+ clock = new FakeClock();
+ stateManager = createMock(StateManager.class);
+ storageUtil = new StorageTestUtil(this);
+ storageUtil.expectOperations();
+ pruner = new TaskHistoryPruner(
+ executor,
+ stateManager,
+ clock,
+ new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY),
+ storageUtil.storage);
+ }
+
+ @Test
+ public void testNoPruning() {
+ long taskATimestamp = clock.nowMillis();
+ IScheduledTask a = makeTask("a", FINISHED);
+
+ clock.advance(ONE_MS);
+ long taskBTimestamp = clock.nowMillis();
+ IScheduledTask b = makeTask("b", SANDBOX_DELETED);
+
+ expectNoImmediatePrune(ImmutableSet.of(a));
+ expectOneDelayedPrune(taskATimestamp);
+ expectNoImmediatePrune(ImmutableSet.of(a, b));
+ expectOneDelayedPrune(taskBTimestamp);
+
+ control.replay();
+
+ pruner.recordStateChange(TaskStateChange.initialized(a));
+ pruner.recordStateChange(TaskStateChange.initialized(b));
+ }
+
+ @Test
+ public void testStorageStartedWithPruning() {
+ long taskATimestamp = clock.nowMillis();
+ IScheduledTask a = makeTask("a", SANDBOX_DELETED);
+
+ clock.advance(ONE_MINUTE);
+ long taskBTimestamp = clock.nowMillis();
+ IScheduledTask b = makeTask("b", LOST);
+
+ clock.advance(ONE_MINUTE);
+ long taskCTimestamp = clock.nowMillis();
+ IScheduledTask c = makeTask("c", FINISHED);
+
+ clock.advance(ONE_MINUTE);
+ IScheduledTask d = makeTask("d", FINISHED);
+ IScheduledTask e = makeTask("job-x", "e", FINISHED);
+
+ expectNoImmediatePrune(ImmutableSet.of(a));
+ expectOneDelayedPrune(taskATimestamp);
+ expectNoImmediatePrune(ImmutableSet.of(a, b));
+ expectOneDelayedPrune(taskBTimestamp);
+ expectImmediatePrune(ImmutableSet.of(a, b, c), a);
+ expectOneDelayedPrune(taskCTimestamp);
+ expectImmediatePrune(ImmutableSet.of(b, c, d), b);
+ expectDefaultDelayedPrune();
+ expectNoImmediatePrune(ImmutableSet.of(e));
+ expectDefaultDelayedPrune();
+
+ control.replay();
+
+ for (IScheduledTask task : ImmutableList.of(a, b, c, d, e)) {
+ pruner.recordStateChange(TaskStateChange.initialized(task));
+ }
+ }
+
+ @Test
+ public void testStateChange() {
+ IScheduledTask starting = makeTask("a", STARTING);
+ IScheduledTask running = copy(starting, RUNNING);
+ IScheduledTask killed = copy(starting, KILLED);
+
+ expectNoImmediatePrune(ImmutableSet.of(killed));
+ expectDefaultDelayedPrune();
+
+ control.replay();
+
+ // No future set for non-terminal state transition.
+ changeState(starting, running);
+
+ // Future set for terminal state transition.
+ changeState(running, killed);
+ }
+
+ @Test
+ public void testActivateFutureAndExceedHistoryGoal() {
+ IScheduledTask running = makeTask("a", RUNNING);
+ IScheduledTask killed = copy(running, KILLED);
+ expectNoImmediatePrune(ImmutableSet.of(running));
+ Capture<Runnable> delayedDelete = expectDefaultDelayedPrune();
+
+ // Expect task "a" to be pruned when future is activated.
+ expectDeleteTasks("a");
+
+ control.replay();
+
+ // Capture future for inactive task "a"
+ changeState(running, killed);
+ clock.advance(ONE_HOUR);
+ // Execute future to prune task "a" from the system.
+ delayedDelete.getValue().run();
+ }
+
+ @Test
+ public void testJobHistoryExceeded() {
+ IScheduledTask a = makeTask("a", RUNNING);
+ clock.advance(ONE_MS);
+ IScheduledTask aKilled = copy(a, KILLED);
+
+ IScheduledTask b = makeTask("b", RUNNING);
+ clock.advance(ONE_MS);
+ IScheduledTask bKilled = copy(b, KILLED);
+
+ IScheduledTask c = makeTask("c", RUNNING);
+ clock.advance(ONE_MS);
+ IScheduledTask cLost = copy(c, LOST);
+
+ IScheduledTask d = makeTask("d", RUNNING);
+ clock.advance(ONE_MS);
+ IScheduledTask dLost = copy(d, LOST);
+
+ expectNoImmediatePrune(ImmutableSet.of(a));
+ expectDefaultDelayedPrune();
+ expectNoImmediatePrune(ImmutableSet.of(a, b));
+ expectDefaultDelayedPrune();
+ expectNoImmediatePrune(ImmutableSet.of(a, b)); // no pruning yet due to min threshold
+ expectDefaultDelayedPrune();
+ clock.advance(ONE_HOUR);
+ expectImmediatePrune(ImmutableSet.of(a, b, c, d), a, b); // now prune 2 tasks
+ expectDefaultDelayedPrune();
+
+ control.replay();
+
+ changeState(a, aKilled);
+ changeState(b, bKilled);
+ changeState(c, cLost);
+ changeState(d, dLost);
+ }
+
+ // TODO(William Farner): Consider removing the thread safety tests. Now that intrinsic locks
+ // are not used, it is rather awkward to test this.
+ @Test
+ public void testThreadSafeStateChangeEvent() throws Exception {
+ // This tests against regression where an executor pruning a task holds an intrinsic lock and
+ // an unrelated task state change in the scheduler fires an event that requires this intrinsic
+ // lock. This causes a deadlock when the executor tries to acquire a lock held by the event
+ // fired.
+
+ pruner = prunerWithRealExecutor();
+ Command onDeleted = new Command() {
+ @Override
+ public void execute() {
+ // The goal is to verify that the call does not deadlock. We do not care about the outcome.
+ IScheduledTask b = makeTask("b", ASSIGNED);
+
+ changeState(b, STARTING);
+ }
+ };
+ CountDownLatch taskDeleted = expectTaskDeleted(onDeleted, TASK_ID);
+
+ control.replay();
+
+ // Change the task to a terminal state and wait for it to be pruned.
+ changeState(makeTask(TASK_ID, RUNNING), KILLED);
+ taskDeleted.await();
+ }
+
+ private TaskHistoryPruner prunerWithRealExecutor() {
+ ScheduledExecutorService realExecutor = Executors.newScheduledThreadPool(1,
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("testThreadSafeEvents-executor")
+ .build());
+ return new TaskHistoryPruner(
+ realExecutor,
+ stateManager,
+ clock,
+ new HistoryPrunnerSettings(Amount.of(1L, Time.MILLISECONDS), ONE_MS, PER_JOB_HISTORY),
+ storageUtil.storage);
+ }
+
+ private CountDownLatch expectTaskDeleted(final Command onDelete, String taskId) {
+ final CountDownLatch deleteCalled = new CountDownLatch(1);
+ final CountDownLatch eventDelivered = new CountDownLatch(1);
+
+ Thread eventDispatch = new Thread() {
+ @Override
+ public void run() {
+ try {
+ deleteCalled.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ fail("Interrupted while awaiting for delete call.");
+ return;
+ }
+ onDelete.execute();
+ eventDelivered.countDown();
+ }
+ };
+ eventDispatch.setDaemon(true);
+ eventDispatch.setName(getClass().getName() + "-EventDispatch");
+ eventDispatch.start();
+
+ stateManager.deleteTasks(ImmutableSet.of(taskId));
+ expectLastCall().andAnswer(new IAnswer<Void>() {
+ @Override
+ public Void answer() {
+ deleteCalled.countDown();
+ try {
+ eventDelivered.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ fail("Interrupted while awaiting for event delivery.");
+ }
+ return null;
+ }
+ });
+
+ return eventDelivered;
+ }
+
+ private void expectDeleteTasks(String... tasks) {
+ stateManager.deleteTasks(ImmutableSet.copyOf(tasks));
+ }
+
+ private Capture<Runnable> expectDefaultDelayedPrune() {
+ return expectDelayedPrune(ONE_DAY.as(Time.MILLISECONDS), 1);
+ }
+
+ private Capture<Runnable> expectOneDelayedPrune(long timestampMillis) {
+ return expectDelayedPrune(timestampMillis, 1);
+ }
+
+ private void expectNoImmediatePrune(ImmutableSet<IScheduledTask> tasksInJob) {
+ expectImmediatePrune(tasksInJob);
+ }
+
+ private void expectImmediatePrune(
+ ImmutableSet<IScheduledTask> tasksInJob,
+ IScheduledTask... pruned) {
+
+ // Expect a deferred prune operation when a new task is being watched.
+ executor.submit(EasyMock.<Runnable>anyObject());
+ expectLastCall().andAnswer(
+ new IAnswer<Future<?>>() {
+ @Override
+ public Future<?> answer() {
+ Runnable work = (Runnable) EasyMock.getCurrentArguments()[0];
+ work.run();
+ return null;
+ }
+ }
+ );
+
+ IJobKey jobKey = Iterables.getOnlyElement(
+ FluentIterable.from(tasksInJob).transform(Tasks.SCHEDULED_TO_JOB_KEY).toSet());
+ storageUtil.expectTaskFetch(TaskHistoryPruner.jobHistoryQuery(jobKey), tasksInJob);
+ if (pruned.length > 0) {
+ stateManager.deleteTasks(Tasks.ids(pruned));
+ }
+ }
+
+ private Capture<Runnable> expectDelayedPrune(long timestampMillis, int count) {
+ Capture<Runnable> capture = createCapture();
+ executor.schedule(
+ EasyMock.capture(capture),
+ eq(pruner.calculateTimeout(timestampMillis)),
+ eq(TimeUnit.MILLISECONDS));
+ expectLastCall().andReturn(future).times(count);
+ return capture;
+ }
+
+ private void changeState(IScheduledTask oldStateTask, IScheduledTask newStateTask) {
+ pruner.recordStateChange(TaskStateChange.transition(newStateTask, oldStateTask.getStatus()));
+ }
+
+ private void changeState(IScheduledTask oldStateTask, ScheduleStatus status) {
+ pruner.recordStateChange(
+ TaskStateChange.transition(copy(oldStateTask, status), oldStateTask.getStatus()));
+ }
+
+ private IScheduledTask copy(IScheduledTask task, ScheduleStatus status) {
+ return IScheduledTask.build(task.newBuilder().setStatus(status));
+ }
+
+ private IScheduledTask makeTask(
+ String job,
+ String taskId,
+ ScheduleStatus status) {
+
+ return IScheduledTask.build(new ScheduledTask()
+ .setStatus(status)
+ .setTaskEvents(ImmutableList.of(new TaskEvent(clock.nowMillis(), status)))
+ .setAssignedTask(makeAssignedTask(job, taskId)));
+ }
+
+ private IScheduledTask makeTask(String taskId, ScheduleStatus status) {
+ return makeTask(JOB_A, taskId, status);
+ }
+
+ private AssignedTask makeAssignedTask(String job, String taskId) {
+ return new AssignedTask()
+ .setSlaveHost(SLAVE_HOST)
+ .setTaskId(taskId)
+ .setTask(new TaskConfig()
+ .setOwner(new Identity().setRole("role").setUser("user"))
+ .setEnvironment("staging45")
+ .setJobName(job)
+ .setExecutorConfig(new ExecutorConfig("aurora", "config")));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
index dbf0bad..3871dae 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DBJobUpdateStoreTest.java
@@ -65,7 +65,9 @@ import static org.apache.aurora.gen.JobUpdateAction.INSTANCE_UPDATED;
import static org.apache.aurora.gen.JobUpdateAction.INSTANCE_UPDATING;
import static org.apache.aurora.gen.JobUpdateStatus.ABORTED;
import static org.apache.aurora.gen.JobUpdateStatus.ERROR;
+import static org.apache.aurora.gen.JobUpdateStatus.FAILED;
import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_BACK;
+import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
import static org.apache.aurora.gen.JobUpdateStatus.ROLL_BACK_PAUSED;
import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_PAUSED;
@@ -383,6 +385,116 @@ public class DBJobUpdateStoreTest {
assertEquals(Optional.<IJobUpdateDetails>absent(), getUpdateDetails(updateId));
}
+ @Test
+ public void testPruneHistory() {
+ String updateId1 = "u11";
+ String updateId2 = "u12";
+ String updateId3 = "u13";
+ String updateId4 = "u14";
+ String updateId5 = "u15";
+ String updateId6 = "u16";
+ String updateId7 = "u17";
+
+ IJobKey job2 = JobKeys.from("testRole2", "testEnv2", "job2");
+ IJobUpdate update1 = makeJobUpdate(JOB, updateId1);
+ IJobUpdate update2 = makeJobUpdate(JOB, updateId2);
+ IJobUpdate update3 = makeJobUpdate(JOB, updateId3);
+ IJobUpdate update4 = makeJobUpdate(JOB, updateId4);
+ IJobUpdate update5 = makeJobUpdate(job2, updateId5);
+ IJobUpdate update6 = makeJobUpdate(job2, updateId6);
+ IJobUpdate update7 = makeJobUpdate(job2, updateId7);
+
+ IJobUpdateEvent updateEvent1 = IJobUpdateEvent.build(new JobUpdateEvent(ROLLING_BACK, 123L));
+ IJobUpdateEvent updateEvent2 = IJobUpdateEvent.build(new JobUpdateEvent(ABORTED, 124L));
+ IJobUpdateEvent updateEvent3 = IJobUpdateEvent.build(new JobUpdateEvent(ROLLED_BACK, 125L));
+ IJobUpdateEvent updateEvent4 = IJobUpdateEvent.build(new JobUpdateEvent(FAILED, 126L));
+ IJobUpdateEvent updateEvent5 = IJobUpdateEvent.build(new JobUpdateEvent(ERROR, 123L));
+ IJobUpdateEvent updateEvent6 = IJobUpdateEvent.build(new JobUpdateEvent(FAILED, 125L));
+ IJobUpdateEvent updateEvent7 = IJobUpdateEvent.build(new JobUpdateEvent(ROLLING_FORWARD, 126L));
+
+ update1 = populateExpected(
+ saveUpdateNoEvent(update1, Optional.of("lock1")), ROLLING_BACK, 123L, 123L);
+ update2 = populateExpected(
+ saveUpdateNoEvent(update2, Optional.<String>absent()), ABORTED, 124L, 124L);
+ update3 = populateExpected(
+ saveUpdateNoEvent(update3, Optional.<String>absent()), ROLLED_BACK, 125L, 125L);
+ update4 = populateExpected(
+ saveUpdateNoEvent(update4, Optional.<String>absent()), FAILED, 126L, 126L);
+ update5 = populateExpected(
+ saveUpdateNoEvent(update5, Optional.<String>absent()), ERROR, 123L, 123L);
+ update6 = populateExpected(
+ saveUpdateNoEvent(update6, Optional.<String>absent()), FAILED, 125L, 125L);
+ update7 = populateExpected(
+ saveUpdateNoEvent(update7, Optional.of("lock2")), ROLLING_FORWARD, 126L, 126L);
+
+ saveJobEvent(updateEvent1, updateId1);
+ saveJobEvent(updateEvent2, updateId2);
+ saveJobEvent(updateEvent3, updateId3);
+ saveJobEvent(updateEvent4, updateId4);
+ saveJobEvent(updateEvent5, updateId5);
+ saveJobEvent(updateEvent6, updateId6);
+ saveJobEvent(updateEvent7, updateId7);
+
+ assertEquals(update1, getUpdate(updateId1).get());
+ assertEquals(update2, getUpdate(updateId2).get());
+ assertEquals(update3, getUpdate(updateId3).get());
+ assertEquals(update4, getUpdate(updateId4).get());
+ assertEquals(update5, getUpdate(updateId5).get());
+ assertEquals(update6, getUpdate(updateId6).get());
+ assertEquals(update7, getUpdate(updateId7).get());
+
+ long pruningThreshold = 120L;
+
+ // No updates pruned.
+ assertEquals(ImmutableSet.<String>of(), pruneHistory(3, pruningThreshold));
+ assertEquals(Optional.of(update7), getUpdate(updateId7)); // active update
+ assertEquals(Optional.of(update6), getUpdate(updateId6));
+ assertEquals(Optional.of(update5), getUpdate(updateId5));
+
+ assertEquals(Optional.of(update4), getUpdate(updateId4));
+ assertEquals(Optional.of(update3), getUpdate(updateId3));
+ assertEquals(Optional.of(update2), getUpdate(updateId2));
+ assertEquals(Optional.of(update1), getUpdate(updateId1)); // active update
+
+ assertEquals(ImmutableSet.of(updateId2), pruneHistory(2, pruningThreshold));
+ // No updates pruned.
+ assertEquals(Optional.of(update7), getUpdate(updateId7)); // active update
+ assertEquals(Optional.of(update6), getUpdate(updateId6));
+ assertEquals(Optional.of(update5), getUpdate(updateId5));
+
+ // 1 update pruned.
+ assertEquals(Optional.of(update4), getUpdate(updateId4));
+ assertEquals(Optional.of(update3), getUpdate(updateId3));
+ assertEquals(Optional.<IJobUpdate>absent(), getUpdate(updateId2));
+ assertEquals(Optional.of(update1), getUpdate(updateId1)); // active update
+
+ assertEquals(ImmutableSet.of(updateId5, updateId3), pruneHistory(1, pruningThreshold));
+ // 1 update pruned.
+ assertEquals(Optional.of(update7), getUpdate(updateId7)); // active update
+ assertEquals(Optional.of(update6), getUpdate(updateId6));
+ assertEquals(Optional.<IJobUpdate>absent(), getUpdate(updateId5));
+
+ // 2 updates pruned.
+ assertEquals(Optional.of(update4), getUpdate(updateId4));
+ assertEquals(Optional.<IJobUpdate>absent(), getUpdate(updateId3));
+ assertEquals(Optional.of(update1), getUpdate(updateId1)); // active update
+
+ // The oldest update is pruned.
+ assertEquals(ImmutableSet.of(updateId6), pruneHistory(1, 126L));
+ assertEquals(Optional.of(update7), getUpdate(updateId7)); // active update
+ assertEquals(Optional.<IJobUpdate>absent(), getUpdate(updateId6));
+
+ assertEquals(Optional.of(update4), getUpdate(updateId4));
+ assertEquals(Optional.of(update1), getUpdate(updateId1)); // active update
+
+ // Nothing survives the 0 per job count.
+ assertEquals(ImmutableSet.of(updateId4), pruneHistory(0, pruningThreshold));
+ assertEquals(Optional.of(update7), getUpdate(updateId7)); // active update
+
+ assertEquals(Optional.<IJobUpdate>absent(), getUpdate(updateId4));
+ assertEquals(Optional.of(update1), getUpdate(updateId1)); // active update
+ }
+
@Test(expected = StorageException.class)
public void testSaveUpdateWithoutLock() {
final IJobUpdate update = makeJobUpdate(JOB, "updateId");
@@ -414,13 +526,7 @@ public class DBJobUpdateStoreTest {
final IJobUpdate update = makeJobUpdate(JOB, "update1");
saveUpdate(update, Optional.of("lock1"));
- storage.write(new MutateWork.NoResult.Quiet() {
- @Override
- public void execute(MutableStoreProvider storeProvider) {
- storeProvider.getLockStore().removeLock(
- makeLock(update.getSummary().getJobKey(), "lock1").getKey());
- }
- });
+ removeLock(update, "lock1");
assertEquals(
Optional.of(updateJobDetails(populateExpected(update), FIRST_EVENT)),
@@ -614,7 +720,7 @@ public class DBJobUpdateStoreTest {
.setUser("fake user"));
}
- private void saveUpdate(final IJobUpdate update, final Optional<String> lockToken) {
+ private IJobUpdate saveUpdate(final IJobUpdate update, final Optional<String> lockToken) {
storage.write(new MutateWork.NoResult.Quiet() {
@Override
public void execute(MutableStoreProvider storeProvider) {
@@ -628,6 +734,23 @@ public class DBJobUpdateStoreTest {
update.getSummary().getUpdateId());
}
});
+
+ return update;
+ }
+
+ private IJobUpdate saveUpdateNoEvent(final IJobUpdate update, final Optional<String> lockToken) {
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override
+ public void execute(MutableStoreProvider storeProvider) {
+ if (lockToken.isPresent()) {
+ storeProvider.getLockStore().saveLock(
+ makeLock(update.getSummary().getJobKey(), lockToken.get()));
+ }
+ storeProvider.getJobUpdateStore().saveJobUpdate(update, lockToken);
+ }
+ });
+
+ return update;
}
private void saveJobEvent(final IJobUpdateEvent event, final String updateId) {
@@ -657,6 +780,25 @@ public class DBJobUpdateStoreTest {
});
}
+ private Set<String> pruneHistory(final int retainCount, final long pruningThresholdMs) {
+ return storage.write(new MutateWork.Quiet<Set<String>>() {
+ @Override
+ public Set<String> apply(MutableStoreProvider storeProvider) {
+ return storeProvider.getJobUpdateStore().pruneHistory(retainCount, pruningThresholdMs);
+ }
+ });
+ }
+
+ private void removeLock(final IJobUpdate update, final String lockToken) {
+ storage.write(new MutateWork.NoResult.Quiet() {
+ @Override
+ public void execute(MutableStoreProvider storeProvider) {
+ storeProvider.getLockStore().removeLock(
+ makeLock(update.getSummary().getJobKey(), lockToken).getKey());
+ }
+ });
+ }
+
private IJobUpdate populateExpected(IJobUpdate update) {
return populateExpected(update, ROLLING_FORWARD, CREATED_MS, CREATED_MS);
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
index 68df0d5..7a8c3b8 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
@@ -55,6 +55,7 @@ import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.storage.LogEntry;
import org.apache.aurora.gen.storage.Op;
+import org.apache.aurora.gen.storage.PruneJobUpdateHistory;
import org.apache.aurora.gen.storage.RemoveJob;
import org.apache.aurora.gen.storage.RemoveLock;
import org.apache.aurora.gen.storage.RemoveQuota;
@@ -854,6 +855,32 @@ public class LogStorageTest extends EasyMockTest {
}.run();
}
+ @Test
+ public void testPruneHistory() throws Exception {
+ final PruneJobUpdateHistory pruneHistory = new PruneJobUpdateHistory()
+ .setHistoryPruneThresholdMs(1L)
+ .setPerJobRetainCount(1);
+
+ new MutationFixture() {
+ @Override
+ protected void setupExpectations() throws Exception {
+ storageUtil.expectWriteOperation();
+ expect(storageUtil.jobUpdateStore.pruneHistory(
+ pruneHistory.getPerJobRetainCount(),
+ pruneHistory.getHistoryPruneThresholdMs())).andReturn(ImmutableSet.of("id1"));
+
+ streamMatcher.expectTransaction(Op.pruneJobUpdateHistory(pruneHistory)).andReturn(position);
+ }
+
+ @Override
+ protected void performMutations(MutableStoreProvider storeProvider) {
+ storeProvider.getJobUpdateStore().pruneHistory(
+ pruneHistory.getPerJobRetainCount(),
+ pruneHistory.getHistoryPruneThresholdMs());
+ }
+ }.run();
+ }
+
private LogEntry createTransaction(Op... ops) {
return LogEntry.transaction(
new Transaction(ImmutableList.copyOf(ops), storageConstants.CURRENT_SCHEMA_VERSION));
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9834b316/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
new file mode 100644
index 0000000..1688a33
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/testing/FakeScheduledExecutor.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.testing;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.twitter.common.collections.Pair;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.util.testing.FakeClock;
+
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A simulated scheduled executor that records scheduled work and executes it when the clock is
+ * advanced past their execution time.
+ */
+public final class FakeScheduledExecutor extends FakeClock {
+
+ private final List<Pair<Long, Runnable>> deferredWork = Lists.newArrayList();
+
+ private FakeScheduledExecutor() { }
+
+ public static FakeScheduledExecutor scheduleExecutor(ScheduledExecutorService mock) {
+ FakeScheduledExecutor executor = new FakeScheduledExecutor();
+ mock.schedule(
+ EasyMock.<Runnable>anyObject(),
+ EasyMock.anyLong(),
+ EasyMock.eq(TimeUnit.MILLISECONDS));
+ expectLastCall().andAnswer(addExpectations(executor, 1)).anyTimes();
+
+ return executor;
+ }
+
+ public static FakeScheduledExecutor scheduleAtFixedRateExecutor(
+ ScheduledExecutorService mock,
+ int maxInvocations) {
+
+ FakeScheduledExecutor executor = new FakeScheduledExecutor();
+ mock.scheduleAtFixedRate(
+ EasyMock.<Runnable>anyObject(),
+ EasyMock.anyLong(),
+ EasyMock.anyLong(),
+ EasyMock.eq(TimeUnit.MILLISECONDS));
+ expectLastCall().andAnswer(addExpectations(executor, maxInvocations)).once();
+
+ return executor;
+ }
+
+ private static IAnswer<ScheduledFuture<?>> addExpectations(
+ final FakeScheduledExecutor executor,
+ final int workCount) {
+
+ return new IAnswer<ScheduledFuture<?>>() {
+ @Override
+ public ScheduledFuture<?> answer() {
+ Object[] args = EasyMock.getCurrentArguments();
+ Runnable work = (Runnable) args[0];
+ long millis = (Long) args[1];
+ Preconditions.checkArgument(millis > 0);
+ for (int i = 1; i <= workCount; i++) {
+ executor.deferredWork.add(Pair.of(executor.nowMillis() + i * millis, work));
+ }
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public void setNowMillis(long nowMillis) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void advance(Amount<Long, Time> period) {
+ super.advance(period);
+ Iterator<Pair<Long, Runnable>> entries = deferredWork.iterator();
+ List<Runnable> toExecute = Lists.newArrayList();
+ while (entries.hasNext()) {
+ Pair<Long, Runnable> next = entries.next();
+ if (next.getFirst() <= nowMillis()) {
+ entries.remove();
+ toExecute.add(next.getSecond());
+ }
+ }
+ for (Runnable work : toExecute) {
+ work.run();
+ }
+ }
+
+ public void assertEmpty() {
+ assertEquals(ImmutableList.<Pair<Long, Runnable>>of(), deferredWork);
+ }
+}