You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2014/03/07 01:27:48 UTC
git commit: Count on task timeouts and task pruning to be idempotent,
simplifying handling code.
Repository: incubator-aurora
Updated Branches:
refs/heads/master 6e4fe853e -> bb52bcbf6
Count on task timeouts and task pruning to be idempotent, simplifying handling
code.
Reviewed at https://reviews.apache.org/r/18484/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/bb52bcbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/bb52bcbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/bb52bcbf
Branch: refs/heads/master
Commit: bb52bcbf6538d7de53d20849c0fdffca0137f9c2
Parents: 6e4fe85
Author: Bill Farner <wf...@apache.org>
Authored: Thu Mar 6 16:26:49 2014 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Thu Mar 6 16:26:54 2014 -0800
----------------------------------------------------------------------
.../aurora/scheduler/async/HistoryPruner.java | 100 +++------
.../aurora/scheduler/async/TaskTimeout.java | 196 ++---------------
.../scheduler/async/HistoryPrunerTest.java | 218 ++++++++-----------
.../aurora/scheduler/async/TaskTimeoutTest.java | 96 --------
4 files changed, 156 insertions(+), 454 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bb52bcbf/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
index c80c000..5bf9838 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/HistoryPruner.java
@@ -18,10 +18,7 @@ package org.apache.aurora.scheduler.async;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
@@ -29,20 +26,20 @@ import java.util.logging.Logger;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
-import com.google.common.collect.LinkedHashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
import com.google.common.eventbus.Subscribe;
import com.google.inject.BindingAnnotation;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.util.Clock;
+import org.apache.aurora.gen.apiConstants;
+import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -55,7 +52,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
-import static org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
/**
* Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks
@@ -64,19 +60,12 @@ import static org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
public class HistoryPruner implements EventSubscriber {
private static final Logger LOG = Logger.getLogger(HistoryPruner.class.getName());
- private final Multimap<IJobKey, String> tasksByJob =
- Multimaps.synchronizedSetMultimap(LinkedHashMultimap.<IJobKey, String>create());
- @VisibleForTesting
- Multimap<IJobKey, String> getTasksByJob() {
- return tasksByJob;
- }
-
private final ScheduledExecutorService executor;
private final StateManager stateManager;
private final Clock clock;
private final long pruneThresholdMillis;
private final int perJobHistoryGoal;
- private final Map<String, Future<?>> taskIdToFuture = Maps.newConcurrentMap();
+ private final Storage storage;
@BindingAnnotation
@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
@@ -88,13 +77,15 @@ public class HistoryPruner implements EventSubscriber {
final StateManager stateManager,
final Clock clock,
@PruneThreshold Amount<Long, Time> inactivePruneThreshold,
- @PruneThreshold int perJobHistoryGoal) {
+ @PruneThreshold int perJobHistoryGoal,
+ Storage storage) {
this.executor = checkNotNull(executor);
this.stateManager = checkNotNull(stateManager);
this.clock = checkNotNull(clock);
this.pruneThresholdMillis = inactivePruneThreshold.as(Time.MILLISECONDS);
this.perJobHistoryGoal = perJobHistoryGoal;
+ this.storage = checkNotNull(storage);
}
@VisibleForTesting
@@ -125,21 +116,9 @@ public class HistoryPruner implements EventSubscriber {
stateManager.deleteTasks(taskIds);
}
- /**
- * When triggered, removes the tasks scheduled for pruning and cancels any existing future.
- *
- * @param event A new TasksDeleted event.
- */
- @Subscribe
- public void tasksDeleted(final TasksDeleted event) {
- for (IScheduledTask task : event.getTasks()) {
- String id = Tasks.id(task);
- tasksByJob.remove(Tasks.SCHEDULED_TO_JOB_KEY.apply(task), id);
- Future<?> future = taskIdToFuture.remove(id);
- if (future != null) {
- future.cancel(false);
- }
- }
+ @VisibleForTesting
+ static Query.Builder jobHistoryQuery(IJobKey jobKey) {
+ return Query.jobScoped(jobKey).byStatus(apiConstants.TERMINAL_STATES);
}
private void registerInactiveTask(
@@ -148,41 +127,34 @@ public class HistoryPruner implements EventSubscriber {
long timeRemaining) {
LOG.fine("Prune task " + taskId + " in " + timeRemaining + " ms.");
- // Insert the latest inactive task at the tail.
- tasksByJob.put(jobKey, taskId);
- Runnable runnable = new Runnable() {
+ executor.schedule(
+ new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("Pruning expired inactive task " + taskId);
+ deleteTasks(ImmutableSet.of(taskId));
+ }
+ },
+ timeRemaining,
+ TimeUnit.MILLISECONDS);
+
+ executor.submit(new Runnable() {
@Override
public void run() {
- LOG.info("Pruning expired inactive task " + taskId);
- tasksByJob.remove(jobKey, taskId);
- taskIdToFuture.remove(taskId);
- deleteTasks(ImmutableSet.of(taskId));
- }
- };
- taskIdToFuture.put(taskId, executor.schedule(runnable, timeRemaining, TimeUnit.MILLISECONDS));
-
- ImmutableSet.Builder<String> pruneTaskIds = ImmutableSet.builder();
- Collection<String> tasks = tasksByJob.get(jobKey);
- // From Multimaps javadoc: "It is imperative that the user manually synchronize on the returned
- // multimap when accessing any of its collection views".
- synchronized (tasksByJob) {
- Iterator<String> iterator = tasks.iterator();
- while (tasks.size() > perJobHistoryGoal) {
- // Pick oldest task from the head. Guaranteed by LinkedHashMultimap based on insertion
- // order.
- String id = iterator.next();
- iterator.remove();
- pruneTaskIds.add(id);
- Future<?> future = taskIdToFuture.remove(id);
- if (future != null) {
- future.cancel(false);
+ Collection<IScheduledTask> inactiveTasks =
+ Storage.Util.weaklyConsistentFetchTasks(storage, jobHistoryQuery(jobKey));
+ int tasksToPrune = inactiveTasks.size() - perJobHistoryGoal;
+ if (tasksToPrune > 0) {
+ if (inactiveTasks.size() > perJobHistoryGoal) {
+ Set<String> toPrune = FluentIterable
+ .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks))
+ .limit(tasksToPrune)
+ .transform(Tasks.SCHEDULED_TO_ID)
+ .toSet();
+ deleteTasks(toPrune);
+ }
}
}
- }
-
- Set<String> ids = pruneTaskIds.build();
- if (!ids.isEmpty()) {
- deleteTasks(ids);
- }
+ });
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bb52bcbf/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
index 82b483b..de79912 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java
@@ -16,9 +16,7 @@
package org.apache.aurora.scheduler.async;
import java.util.EnumSet;
-import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -27,19 +25,11 @@ import java.util.logging.Logger;
import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Objects;
import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Ordering;
import com.google.common.eventbus.Subscribe;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.stats.StatsProvider;
-import com.twitter.common.util.Clock;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
@@ -59,9 +49,6 @@ class TaskTimeout implements EventSubscriber {
static final String TIMED_OUT_TASKS_COUNTER = "timed_out_tasks";
@VisibleForTesting
- static final String TRANSIENT_COUNT_STAT_NAME = "transient_states";
-
- @VisibleForTesting
static final Optional<String> TIMEOUT_MESSAGE = Optional.of("Task timed out");
@VisibleForTesting
@@ -72,76 +59,22 @@ class TaskTimeout implements EventSubscriber {
ScheduleStatus.KILLING,
ScheduleStatus.DRAINING);
- private final Map<TimeoutKey, Context> futures = Maps.newConcurrentMap();
-
- private static final class TimeoutKey {
- private final String taskId;
- private final ScheduleStatus status;
-
- private TimeoutKey(String taskId, ScheduleStatus status) {
- this.taskId = taskId;
- this.status = status;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(taskId, status);
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof TimeoutKey)) {
- return false;
- }
- TimeoutKey key = (TimeoutKey) o;
- return Objects.equal(taskId, key.taskId)
- && (status == key.status);
- }
-
- @Override
- public String toString() {
- return taskId + ":" + status;
- }
- }
-
private final ScheduledExecutorService executor;
private final StateManager stateManager;
private final long timeoutMillis;
- private final Clock clock;
private final AtomicLong timedOutTasks;
@Inject
TaskTimeout(
ScheduledExecutorService executor,
StateManager stateManager,
- final Clock clock,
Amount<Long, Time> timeout,
StatsProvider statsProvider) {
this.executor = checkNotNull(executor);
this.stateManager = checkNotNull(stateManager);
this.timeoutMillis = timeout.as(Time.MILLISECONDS);
- this.clock = checkNotNull(clock);
this.timedOutTasks = statsProvider.makeCounter(TIMED_OUT_TASKS_COUNTER);
-
- exportStats(statsProvider);
- }
-
- private void registerTimeout(TimeoutKey key) {
- // This is an obvious check-then-act, but:
- // - there isn't much of a better option, given that we have to get the Future before
- // inserting into the map
- // - a key collision only happens in practice if something is wrong externally to this class
- // (double event for the same state)
- // - the outcome is low-risk, we would wind up with a redundant Future that will eventually
- // no-op
- if (!futures.containsKey(key)) {
- Future<?> timeoutHandler = executor.schedule(
- new TimedOutTaskHandler(key),
- timeoutMillis,
- TimeUnit.MILLISECONDS);
- futures.put(key, new Context(clock.nowMillis(), timeoutHandler));
- }
}
private static boolean isTransient(ScheduleStatus status) {
@@ -150,112 +83,31 @@ class TaskTimeout implements EventSubscriber {
@Subscribe
public void recordStateChange(TaskStateChange change) {
- String taskId = change.getTaskId();
- ScheduleStatus newState = change.getNewState();
- if (change.isTransition() && isTransient(change.getOldState().get())) {
- TimeoutKey oldKey = new TimeoutKey(taskId, change.getOldState().get());
- Context context = futures.remove(oldKey);
- if (context != null) {
- LOG.fine("Canceling state timeout for task " + oldKey);
- context.future.cancel(false);
- }
- }
-
+ final String taskId = change.getTaskId();
+ final ScheduleStatus newState = change.getNewState();
if (isTransient(newState)) {
- registerTimeout(new TimeoutKey(taskId, change.getNewState()));
- }
- }
-
- private class TimedOutTaskHandler implements Runnable {
- private final TimeoutKey key;
-
- TimedOutTaskHandler(TimeoutKey key) {
- this.key = key;
- }
-
- @Override
- public void run() {
- Context context = futures.get(key);
- try {
- if (context == null) {
- LOG.warning("Timeout context not found for " + key);
- return;
- }
-
- LOG.info("Timeout reached for task " + key);
- // This query acts as a CAS by including the state that we expect the task to be in if the
- // timeout is still valid. Ideally, the future would have already been canceled, but in the
- // event of a state transition race, including transientState prevents an unintended
- // task timeout.
- // Note: This requires LOST transitions trigger Driver.killTask.
- if (stateManager.changeState(
- key.taskId,
- Optional.of(key.status),
- ScheduleStatus.LOST,
- TIMEOUT_MESSAGE)) {
-
- timedOutTasks.incrementAndGet();
- } else {
- LOG.warning("Task " + key + " does not exist, or was not in the expected state.");
- }
- } finally {
- futures.remove(key);
- }
- }
- }
-
- private class Context {
- private final long timestampMillis;
- private final Future<?> future;
-
- Context(long timestampMillis, Future<?> future) {
- this.timestampMillis = timestampMillis;
- this.future = future;
- }
- }
-
- private static final Function<Context, Long> CONTEXT_TIMESTAMP = new Function<Context, Long>() {
- @Override
- public Long apply(Context context) {
- return context.timestampMillis;
- }
- };
-
- private static final Ordering<Context> TIMESTAMP_ORDER =
- Ordering.natural().onResultOf(CONTEXT_TIMESTAMP);
-
- @VisibleForTesting
- static String waitingTimeStatName(ScheduleStatus status) {
- return "scheduler_max_" + status + "_waiting_ms";
- }
-
- private void exportStats(StatsProvider statsProvider) {
- statsProvider.makeGauge(TRANSIENT_COUNT_STAT_NAME, new Supplier<Number>() {
- @Override
- public Number get() {
- return futures.size();
- }
- });
-
- for (final ScheduleStatus status : TRANSIENT_STATES) {
- statsProvider.makeGauge(waitingTimeStatName(status), new Supplier<Number>() {
- private final Predicate<TimeoutKey> statusMatcher = new Predicate<TimeoutKey>() {
- @Override
- public boolean apply(TimeoutKey key) {
- return key.status == status;
- }
- };
-
- @Override
- public Number get() {
- Iterable<Context> matches = Maps.filterKeys(futures, statusMatcher).values();
- if (Iterables.isEmpty(matches)) {
- return 0L;
- } else {
- return clock.nowMillis() - TIMESTAMP_ORDER.min(matches).timestampMillis;
- }
- }
- });
+ executor.schedule(
+ new Runnable() {
+ @Override
+ public void run() {
+ // This query acts as a CAS by including the state that we expect the task to be in if
+ // the timeout is still valid. Ideally, the future would have already been canceled,
+ // but in the event of a state transition race, including transientState prevents an
+ // unintended task timeout.
+ // Note: This requires LOST transitions trigger Driver.killTask.
+ if (stateManager.changeState(
+ taskId,
+ Optional.of(newState),
+ ScheduleStatus.LOST,
+ TIMEOUT_MESSAGE)) {
+
+ LOG.info("Timeout reached for task " + taskId + ":" + taskId);
+ timedOutTasks.incrementAndGet();
+ }
+ }
+ },
+ timeoutMillis,
+ TimeUnit.MILLISECONDS);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bb52bcbf/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
index 811f68c..f7c9e5e 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/HistoryPrunerTest.java
@@ -17,13 +17,15 @@ package org.apache.aurora.scheduler.async;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.twitter.common.base.Command;
import com.twitter.common.quantity.Amount;
@@ -43,11 +45,10 @@ import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
-import org.easymock.IExpectationSetters;
-import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -57,11 +58,8 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLED;
import static org.apache.aurora.gen.ScheduleStatus.LOST;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.gen.ScheduleStatus.STARTING;
-import static org.apache.aurora.scheduler.events.PubsubEvent.TasksDeleted;
import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class HistoryPrunerTest extends EasyMockTest {
@@ -77,6 +75,7 @@ public class HistoryPrunerTest extends EasyMockTest {
private ScheduledExecutorService executor;
private FakeClock clock;
private StateManager stateManager;
+ private StorageTestUtil storageUtil;
private HistoryPruner pruner;
@Before
@@ -85,21 +84,15 @@ public class HistoryPrunerTest extends EasyMockTest {
executor = createMock(ScheduledExecutorService.class);
clock = new FakeClock();
stateManager = createMock(StateManager.class);
+ storageUtil = new StorageTestUtil(this);
+ storageUtil.expectOperations();
pruner = new HistoryPruner(
executor,
stateManager,
clock,
ONE_DAY,
- PER_JOB_HISTORY);
- }
-
- @After
- public void validateNoLeak() {
- synchronized (pruner.getTasksByJob()) {
- assertEquals(
- ImmutableMultimap.<IJobKey, String>of(),
- ImmutableMultimap.copyOf(pruner.getTasksByJob()));
- }
+ PER_JOB_HISTORY,
+ storageUtil.storage);
}
@Test
@@ -111,18 +104,15 @@ public class HistoryPrunerTest extends EasyMockTest {
long taskBTimestamp = clock.nowMillis();
IScheduledTask b = makeTask("b", LOST);
- expectOneTaskWatch(taskATimestamp);
- expectOneTaskWatch(taskBTimestamp);
-
- expectCancelFuture().times(2);
+ expectNoImmediatePrune(ImmutableSet.of(a));
+ expectOneDelayedPrune(taskATimestamp);
+ expectNoImmediatePrune(ImmutableSet.of(a, b));
+ expectOneDelayedPrune(taskBTimestamp);
control.replay();
pruner.recordStateChange(TaskStateChange.initialized(a));
pruner.recordStateChange(TaskStateChange.initialized(b));
-
- // Clean-up
- pruner.tasksDeleted(new TasksDeleted(ImmutableSet.of(a, b)));
}
@Test
@@ -142,53 +132,48 @@ public class HistoryPrunerTest extends EasyMockTest {
IScheduledTask d = makeTask("d", FINISHED);
IScheduledTask e = makeTask("job-x", "e", FINISHED);
- expectOneTaskWatch(taskATimestamp);
- expectOneTaskWatch(taskBTimestamp);
- expectOneTaskWatch(taskCTimestamp);
- expectDefaultTaskWatch();
- expectDefaultTaskWatch();
-
- // Cancel future and delete pruned task "a" asynchronously.
- expectCancelFuture();
- stateManager.deleteTasks(Tasks.ids(a));
-
- // Cancel future and delete pruned task "b" asynchronously.
- expectCancelFuture();
- stateManager.deleteTasks(Tasks.ids(b));
-
- expectCancelFuture().times(3);
+ expectNoImmediatePrune(ImmutableSet.of(a));
+ expectOneDelayedPrune(taskATimestamp);
+ expectNoImmediatePrune(ImmutableSet.of(a, b));
+ expectOneDelayedPrune(taskBTimestamp);
+ expectImmediatePrune(ImmutableSet.of(a, b, c), a);
+ expectOneDelayedPrune(taskCTimestamp);
+ expectImmediatePrune(ImmutableSet.of(b, c, d), b);
+ expectDefaultDelayedPrune();
+ expectNoImmediatePrune(ImmutableSet.of(e));
+ expectDefaultDelayedPrune();
control.replay();
for (IScheduledTask task : ImmutableList.of(a, b, c, d, e)) {
pruner.recordStateChange(TaskStateChange.initialized(task));
}
-
- // Clean-up
- pruner.tasksDeleted(new TasksDeleted(ImmutableSet.of(c, d, e)));
}
@Test
public void testStateChange() {
- expectDefaultTaskWatch();
+ IScheduledTask starting = makeTask("a", STARTING);
+ IScheduledTask running = copy(starting, RUNNING);
+ IScheduledTask killed = copy(starting, KILLED);
- expectCancelFuture();
+ expectNoImmediatePrune(ImmutableSet.of(killed));
+ expectDefaultDelayedPrune();
control.replay();
// No future set for non-terminal state transition.
- changeState(STARTING, RUNNING);
+ changeState(starting, running);
// Future set for terminal state transition.
- IScheduledTask a = changeState(RUNNING, KILLED);
-
- // Clean-up
- pruner.tasksDeleted(new TasksDeleted(ImmutableSet.of(a)));
+ changeState(running, killed);
}
@Test
public void testActivateFutureAndExceedHistoryGoal() {
- Capture<Runnable> delayedDelete = expectDefaultTaskWatch();
+ IScheduledTask running = makeTask("a", RUNNING);
+ IScheduledTask killed = copy(running, KILLED);
+ expectNoImmediatePrune(ImmutableSet.of(running));
+ Capture<Runnable> delayedDelete = expectDefaultDelayedPrune();
// Expect task "a" to be pruned when future is activated.
stateManager.deleteTasks(ImmutableSet.of("a"));
@@ -196,7 +181,7 @@ public class HistoryPrunerTest extends EasyMockTest {
control.replay();
// Capture future for inactive task "a"
- changeState("a", RUNNING, KILLED);
+ changeState(running, killed);
clock.advance(ONE_HOUR);
// Execute future to prune task "a" from the system.
delayedDelete.getValue().run();
@@ -204,43 +189,30 @@ public class HistoryPrunerTest extends EasyMockTest {
@Test
public void testJobHistoryExceeded() {
- // Future for tasks - a,b,c
- expectDefaultTaskWatchTimes(3);
-
- // Cancel future and delete task "a" asynchronously when history goal is exceeded.
- expectCancelFuture();
- stateManager.deleteTasks(ImmutableSet.of("a"));
-
- expectCancelFuture().times(2);
-
- control.replay();
-
- changeState("a", RUNNING, KILLED);
+ IScheduledTask a = makeTask("a", RUNNING);
clock.advance(ONE_HOUR);
- IScheduledTask b = changeState("b", RUNNING, KILLED);
+ IScheduledTask aKilled = copy(a, KILLED);
+
+ IScheduledTask b = makeTask("b", RUNNING);
clock.advance(ONE_HOUR);
- IScheduledTask c = changeState("c", RUNNING, LOST);
+ IScheduledTask bKilled = copy(b, KILLED);
- // Clean-up
- pruner.tasksDeleted(new TasksDeleted(ImmutableSet.of(b, c)));
- }
+ IScheduledTask c = makeTask("c", RUNNING);
+ clock.advance(ONE_HOUR);
+ IScheduledTask cLost = copy(c, LOST);
- @Test
- public void testTasksDeleted() {
- IScheduledTask a = makeTask("a", FINISHED);
- IScheduledTask b = makeTask("b", FINISHED);
- expectDefaultTaskWatch();
- expectCancelFuture();
+ expectNoImmediatePrune(ImmutableSet.of(a));
+ expectDefaultDelayedPrune();
+ expectNoImmediatePrune(ImmutableSet.of(a, b));
+ expectDefaultDelayedPrune();
+ expectImmediatePrune(ImmutableSet.of(a, b, c), a);
+ expectDefaultDelayedPrune();
control.replay();
- pruner.recordStateChange(TaskStateChange.initialized(a));
-
- // Cancels existing future for task 'a'
- pruner.tasksDeleted(new TasksDeleted(ImmutableSet.of(a)));
-
- // No-Op
- pruner.tasksDeleted(new TasksDeleted(ImmutableSet.of(b)));
+ changeState(a, aKilled);
+ changeState(b, bKilled);
+ changeState(c, cLost);
}
// TODO(William Farner): Consider removing the thread safety tests. Now that intrinsic locks
@@ -257,32 +229,9 @@ public class HistoryPrunerTest extends EasyMockTest {
@Override
public void execute() {
// The goal is to verify that the call does not deadlock. We do not care about the outcome.
- changeState("b", ASSIGNED, STARTING);
- }
- };
- CountDownLatch taskDeleted = expectTaskDeleted(onDeleted, TASK_ID);
-
- control.replay();
-
- // Change the task to a terminal state and wait for it to be pruned.
- changeState(TASK_ID, RUNNING, KILLED);
- taskDeleted.await();
- }
-
- @Test
- public void testThreadSafeDeleteEvents() throws Exception {
- // This tests against regression where deleting a task causes an event to be fired
- // synchronously (on the EventBus) from a separate thread. This posed a problem because
- // the event handler was synchronized, causing the EventBus thread to deadlock acquiring
- // the lock held by the thread deleting tasks.
+ IScheduledTask b = makeTask("b", ASSIGNED);
- pruner = prunerWithRealExecutor();
- Command onDeleted = new Command() {
- @Override
- public void execute() {
- // The goal is to verify that the call does not deadlock. We do not care about the outcome.
- pruner.tasksDeleted(
- new TasksDeleted(ImmutableSet.of(makeTask("a", ScheduleStatus.KILLED))));
+ changeState(b, STARTING);
}
};
CountDownLatch taskDeleted = expectTaskDeleted(onDeleted, TASK_ID);
@@ -290,7 +239,7 @@ public class HistoryPrunerTest extends EasyMockTest {
control.replay();
// Change the task to a terminal state and wait for it to be pruned.
- changeState(TASK_ID, RUNNING, KILLED);
+ changeState(makeTask(TASK_ID, RUNNING), KILLED);
taskDeleted.await();
}
@@ -305,7 +254,8 @@ public class HistoryPrunerTest extends EasyMockTest {
stateManager,
clock,
Amount.of(1L, Time.MILLISECONDS),
- PER_JOB_HISTORY);
+ PER_JOB_HISTORY,
+ storageUtil.storage);
}
private CountDownLatch expectTaskDeleted(final Command onDelete, String taskId) {
@@ -348,19 +298,44 @@ public class HistoryPrunerTest extends EasyMockTest {
return eventDelivered;
}
- private Capture<Runnable> expectDefaultTaskWatch() {
- return expectTaskWatch(ONE_DAY.as(Time.MILLISECONDS), 1);
+ private Capture<Runnable> expectDefaultDelayedPrune() {
+ return expectDelayedPrune(ONE_DAY.as(Time.MILLISECONDS), 1);
}
- private Capture<Runnable> expectDefaultTaskWatchTimes(int count) {
- return expectTaskWatch(ONE_DAY.as(Time.MILLISECONDS), count);
+ private Capture<Runnable> expectOneDelayedPrune(long timestampMillis) {
+ return expectDelayedPrune(timestampMillis, 1);
}
- private Capture<Runnable> expectOneTaskWatch(long timestampMillis) {
- return expectTaskWatch(timestampMillis, 1);
+ private void expectNoImmediatePrune(ImmutableSet<IScheduledTask> tasksInJob) {
+ expectImmediatePrune(tasksInJob);
+ }
+
+ private void expectImmediatePrune(
+ ImmutableSet<IScheduledTask> tasksInJob,
+ IScheduledTask... pruned) {
+
+ // Expect a deferred prune operation when a new task is being watched.
+ executor.submit(EasyMock.<Runnable>anyObject());
+ expectLastCall().andAnswer(
+ new IAnswer<Future<?>>() {
+ @Override
+ public Future<?> answer() {
+ Runnable work = (Runnable) EasyMock.getCurrentArguments()[0];
+ work.run();
+ return null;
+ }
+ }
+ );
+
+ IJobKey jobKey = Iterables.getOnlyElement(
+ FluentIterable.from(tasksInJob).transform(Tasks.SCHEDULED_TO_JOB_KEY).toSet());
+ storageUtil.expectTaskFetch(HistoryPruner.jobHistoryQuery(jobKey), tasksInJob);
+ if (pruned.length > 0) {
+ stateManager.deleteTasks(Tasks.ids(pruned));
+ }
}
- private Capture<Runnable> expectTaskWatch(long timestampMillis, int count) {
+ private Capture<Runnable> expectDelayedPrune(long timestampMillis, int count) {
Capture<Runnable> capture = createCapture();
executor.schedule(
EasyMock.capture(capture),
@@ -370,18 +345,17 @@ public class HistoryPrunerTest extends EasyMockTest {
return capture;
}
- private IExpectationSetters<?> expectCancelFuture() {
- return expect(future.cancel(false)).andReturn(true);
+ private void changeState(IScheduledTask oldStateTask, IScheduledTask newStateTask) {
+ pruner.recordStateChange(TaskStateChange.transition(newStateTask, oldStateTask.getStatus()));
}
- private IScheduledTask changeState(ScheduleStatus from, ScheduleStatus to) {
- return changeState(TASK_ID, from, to);
+ private void changeState(IScheduledTask oldStateTask, ScheduleStatus status) {
+ pruner.recordStateChange(
+ TaskStateChange.transition(copy(oldStateTask, status), oldStateTask.getStatus()));
}
- private IScheduledTask changeState(String taskId, ScheduleStatus from, ScheduleStatus to) {
- IScheduledTask task = makeTask(taskId, to);
- pruner.recordStateChange(TaskStateChange.transition(task, from));
- return task;
+ private IScheduledTask copy(IScheduledTask task, ScheduleStatus status) {
+ return IScheduledTask.build(task.newBuilder().setStatus(status));
}
private IScheduledTask makeTask(
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/bb52bcbf/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
index 71d6a9e..bd1f599 100644
--- a/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/async/TaskTimeoutTest.java
@@ -15,16 +15,13 @@
*/
package org.apache.aurora.scheduler.async;
-import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.stats.StatsProvider;
@@ -41,13 +38,10 @@ import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.easymock.Capture;
import org.easymock.EasyMock;
-import org.easymock.IExpectationSetters;
-import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
-import static org.apache.aurora.gen.ScheduleStatus.DRAINING;
import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
import static org.apache.aurora.gen.ScheduleStatus.INIT;
import static org.apache.aurora.gen.ScheduleStatus.KILLED;
@@ -55,7 +49,6 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLING;
import static org.apache.aurora.gen.ScheduleStatus.LOST;
import static org.apache.aurora.gen.ScheduleStatus.PENDING;
import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING;
-import static org.apache.aurora.gen.ScheduleStatus.RESTARTING;
import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.gen.ScheduleStatus.STARTING;
import static org.easymock.EasyMock.eq;
@@ -69,9 +62,6 @@ public class TaskTimeoutTest extends EasyMockTest {
private static final long TIMEOUT_MS = Amount.of(1L, Time.MINUTES).as(Time.MILLISECONDS);
private AtomicLong timedOutTaskCounter;
- private Capture<Supplier<Number>> stateCountCapture;
- private Map<ScheduleStatus, Capture<Supplier<Number>>> stateCaptures;
-
private ScheduledExecutorService executor;
private ScheduledFuture<?> future;
private StateManager stateManager;
@@ -86,33 +76,9 @@ public class TaskTimeoutTest extends EasyMockTest {
stateManager = createMock(StateManager.class);
clock = new FakeClock();
statsProvider = createMock(StatsProvider.class);
- expectStatsProvider();
- }
-
- @After
- public void verifyTasksDepleted() {
- // Verify there is no memory leak.
- assertEquals(0, stateCountCapture.getValue().get().intValue());
- }
-
- private void expectStatsProvider() {
timedOutTaskCounter = new AtomicLong();
expect(statsProvider.makeCounter(TaskTimeout.TIMED_OUT_TASKS_COUNTER))
.andReturn(timedOutTaskCounter);
-
- stateCountCapture = createCapture();
- expect(statsProvider.makeGauge(
- eq(TaskTimeout.TRANSIENT_COUNT_STAT_NAME),
- EasyMock.capture(stateCountCapture))).andReturn(null);
-
- stateCaptures = Maps.newHashMap();
- for (ScheduleStatus status : TaskTimeout.TRANSIENT_STATES) {
- Capture<Supplier<Number>> statusCapture = createCapture();
- expect(statsProvider.makeGauge(
- eq(TaskTimeout.waitingTimeStatName(status)),
- EasyMock.capture(statusCapture))).andReturn(null);
- stateCaptures.put(status, statusCapture);
- }
}
private void replayAndCreate() {
@@ -120,7 +86,6 @@ public class TaskTimeoutTest extends EasyMockTest {
timeout = new TaskTimeout(
executor,
stateManager,
- clock,
Amount.of(TIMEOUT_MS, Time.MILLISECONDS),
statsProvider);
}
@@ -139,10 +104,6 @@ public class TaskTimeoutTest extends EasyMockTest {
return expectTaskWatch(TIMEOUT_MS);
}
- private IExpectationSetters<?> expectCancel() {
- return expect(future.cancel(false)).andReturn(true);
- }
-
private void changeState(String taskId, ScheduleStatus from, ScheduleStatus to) {
IScheduledTask task = IScheduledTask.build(new ScheduledTask()
.setStatus(to)
@@ -157,9 +118,7 @@ public class TaskTimeoutTest extends EasyMockTest {
@Test
public void testNormalTransitions() {
expectTaskWatch();
- expectCancel();
expectTaskWatch();
- expectCancel();
replayAndCreate();
@@ -174,7 +133,6 @@ public class TaskTimeoutTest extends EasyMockTest {
@Test
public void testTransientToTransient() {
expectTaskWatch();
- expectCancel();
Capture<Runnable> killingTimeout = expectTaskWatch();
expect(stateManager.changeState(
TASK_ID,
@@ -241,11 +199,9 @@ public class TaskTimeoutTest extends EasyMockTest {
@Test
public void testStorageStart() {
-
expectTaskWatch(TIMEOUT_MS);
expectTaskWatch(TIMEOUT_MS);
expectTaskWatch(TIMEOUT_MS);
- expectCancel().times(3);
replayAndCreate();
@@ -262,56 +218,4 @@ public class TaskTimeoutTest extends EasyMockTest {
changeState("b", KILLING, KILLED);
changeState("c", PREEMPTING, FINISHED);
}
-
- private void checkOutstandingTimer(ScheduleStatus status, long expectedValue) {
- long value = stateCaptures.get(status).getValue().get().longValue();
- assertEquals(expectedValue, value);
- }
-
- @Test
- public void testOutstandingTimers() throws Exception {
- expectTaskWatch();
- expectTaskWatch();
- expectCancel();
- expectTaskWatch();
- expectCancel().times(2);
-
- replayAndCreate();
-
- checkOutstandingTimer(ASSIGNED, 0);
- checkOutstandingTimer(PREEMPTING, 0);
- checkOutstandingTimer(RESTARTING, 0);
- checkOutstandingTimer(DRAINING, 0);
- checkOutstandingTimer(KILLING, 0);
-
- changeState("a", PENDING, ASSIGNED);
-
- Amount<Long, Time> tick = Amount.of(10L, Time.SECONDS);
- clock.advance(tick);
-
- checkOutstandingTimer(ASSIGNED, tick.as(Time.MILLISECONDS));
-
- clock.advance(tick);
- changeState("b", PENDING, ASSIGNED);
-
- clock.advance(tick);
- checkOutstandingTimer(ASSIGNED, tick.as(Time.MILLISECONDS) * 3);
-
- changeState("a", ASSIGNED, RUNNING);
- clock.advance(tick);
- changeState("a", RUNNING, KILLING);
- clock.advance(tick);
-
- checkOutstandingTimer(ASSIGNED, tick.as(Time.MILLISECONDS) * 3);
- checkOutstandingTimer(KILLING, tick.as(Time.MILLISECONDS));
-
- changeState("a", KILLING, KILLED);
- changeState("b", ASSIGNED, FINISHED);
-
- checkOutstandingTimer(ASSIGNED, 0);
- checkOutstandingTimer(PREEMPTING, 0);
- checkOutstandingTimer(RESTARTING, 0);
- checkOutstandingTimer(DRAINING, 0);
- checkOutstandingTimer(KILLING, 0);
- }
}