You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by zm...@apache.org on 2016/01/22 02:38:35 UTC
aurora git commit: Turn TaskHistoryPruner into a service and trigger
shutdown on pruning failure.
Repository: aurora
Updated Branches:
refs/heads/master a2c7ccc17 -> c89fecbcd
Turn TaskHistoryPruner into a service and trigger shutdown on pruning failure.
Task pruning is key to operating a large cluster and failure to prune should
trigger shutdown to prevent unbounded growth of storage. This patch turns
`TaskHistoryPruner` into a service which propagates failure from failed pruning
attempts towards the `ServiceManager`. Also completing a TODO which removes a
test for behaviour that is very awkward to test for.
Bugs closed: AURORA-1582
Reviewed at https://reviews.apache.org/r/42332/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/c89fecbc
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/c89fecbc
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/c89fecbc
Branch: refs/heads/master
Commit: c89fecbcd93aa6ddcf6af60f2a2cb6315b7a4d19
Parents: a2c7ccc
Author: Zameer Manji <zm...@apache.org>
Authored: Thu Jan 21 17:38:25 2016 -0800
Committer: Zameer Manji <zm...@apache.org>
Committed: Thu Jan 21 17:38:25 2016 -0800
----------------------------------------------------------------------
.../aurora/scheduler/pruning/PruningModule.java | 1 +
.../scheduler/pruning/TaskHistoryPruner.java | 52 ++++++--
.../pruning/TaskHistoryPrunerTest.java | 126 ++++---------------
3 files changed, 68 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/c89fecbc/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java b/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
index 735199a..efdfbda 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
@@ -83,6 +83,7 @@ public class PruningModule extends AbstractModule {
expose(TaskHistoryPruner.class);
}
});
+ SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(TaskHistoryPruner.class);
PubsubEventModule.bindSubscriber(binder(), TaskHistoryPruner.class);
install(new PrivateModule() {
http://git-wip-us.apache.org/repos/asf/aurora/blob/c89fecbc/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
index 2064089..2d4c58e 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
@@ -14,6 +14,9 @@
package org.apache.aurora.scheduler.pruning;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
@@ -22,7 +25,9 @@ import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Queues;
import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractScheduledService;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
@@ -42,6 +47,8 @@ import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
+import static com.google.common.base.Preconditions.checkState;
+
import static org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -49,7 +56,7 @@ import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
* Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks
* transitioning into one of the inactive states.
*/
-public class TaskHistoryPruner implements EventSubscriber {
+public class TaskHistoryPruner extends AbstractScheduledService implements EventSubscriber {
private static final Logger LOG = LoggerFactory.getLogger(TaskHistoryPruner.class);
private final DelayExecutor executor;
@@ -57,6 +64,7 @@ public class TaskHistoryPruner implements EventSubscriber {
private final Clock clock;
private final HistoryPrunnerSettings settings;
private final Storage storage;
+ private final ConcurrentLinkedQueue<FutureTask<Void>> futureTasks;
private final Predicate<IScheduledTask> safeToDelete = new Predicate<IScheduledTask>() {
@Override
@@ -95,6 +103,7 @@ public class TaskHistoryPruner implements EventSubscriber {
this.clock = requireNonNull(clock);
this.settings = requireNonNull(settings);
this.storage = requireNonNull(storage);
+ this.futureTasks = Queues.newConcurrentLinkedQueue();
}
@VisibleForTesting
@@ -111,6 +120,8 @@ public class TaskHistoryPruner implements EventSubscriber {
*/
@Subscribe
public void recordStateChange(TaskStateChange change) {
+ checkState(isRunning());
+
if (Tasks.isTerminated(change.getNewState())) {
long timeoutBasis = change.isTransition()
? clock.nowMillis()
@@ -122,6 +133,22 @@ public class TaskHistoryPruner implements EventSubscriber {
}
}
+ @Override
+ protected void runOneIteration() throws Exception {
+ // Check if the prune attempts fail and propagate the exception. This will trigger
+ // service (and the scheduler) to shut down.
+ FutureTask<Void> future;
+
+ while ((future = futureTasks.poll()) != null) {
+ future.get();
+ }
+ }
+
+ @Override
+ protected Scheduler scheduler() {
+ return AbstractScheduledService.Scheduler.newFixedDelaySchedule(0, 5, TimeUnit.SECONDS);
+ }
+
private void deleteTasks(final Set<String> taskIds) {
LOG.info("Pruning inactive tasks " + taskIds);
storage.write(
@@ -139,14 +166,16 @@ public class TaskHistoryPruner implements EventSubscriber {
long timeRemaining) {
LOG.debug("Prune task " + taskId + " in " + timeRemaining + " ms.");
- executor.execute(
- () -> {
- LOG.info("Pruning expired inactive task " + taskId);
- deleteTasks(ImmutableSet.of(taskId));
- },
- Amount.of(timeRemaining, Time.MILLISECONDS));
-
- executor.execute(() -> {
+
+ FutureTask<Void> pruneSingleTask = new FutureTask<>(() -> {
+ LOG.info("Pruning expired inactive task " + taskId);
+ deleteTasks(ImmutableSet.of(taskId));
+ }, null);
+ futureTasks.add(pruneSingleTask);
+
+ executor.execute(pruneSingleTask, Amount.of(timeRemaining, Time.MILLISECONDS));
+
+ FutureTask<Void> pruneRemainingTasksFromJob = new FutureTask<>(() -> {
Iterable<IScheduledTask> inactiveTasks =
Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey));
int numInactiveTasks = Iterables.size(inactiveTasks);
@@ -162,6 +191,9 @@ public class TaskHistoryPruner implements EventSubscriber {
deleteTasks(toPrune);
}
}
- });
+ }, null);
+ futureTasks.add(pruneRemainingTasksFromJob);
+
+ executor.execute(pruneRemainingTasksFromJob);
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/c89fecbc/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
index 295960f..e1b5391 100644
--- a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
@@ -13,26 +13,15 @@
*/
package org.apache.aurora.scheduler.pruning;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.io.Closer;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Key;
-
-import org.apache.aurora.common.base.Command;
+import com.google.common.util.concurrent.Service;
+
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
import org.apache.aurora.common.util.testing.FakeClock;
import org.apache.aurora.gen.AssignedTask;
@@ -43,8 +32,6 @@ import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.gen.TaskConfig;
import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.async.AsyncModule;
-import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
import org.apache.aurora.scheduler.async.DelayExecutor;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -53,14 +40,12 @@ import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
import static org.apache.aurora.gen.ScheduleStatus.KILLED;
import static org.apache.aurora.gen.ScheduleStatus.LOST;
@@ -68,11 +53,12 @@ import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
import static org.apache.aurora.gen.ScheduleStatus.STARTING;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
public class TaskHistoryPrunerTest extends EasyMockTest {
private static final String JOB_A = "job-a";
- private static final String TASK_ID = "task_id";
private static final String SLAVE_HOST = "HOST_A";
private static final Amount<Long, Time> ONE_MS = Amount.of(1L, Time.MILLISECONDS);
private static final Amount<Long, Time> ONE_MINUTE = Amount.of(1L, Time.MINUTES);
@@ -101,6 +87,8 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY),
storageUtil.storage);
closer = Closer.create();
+
+ pruner.startAsync().awaitRunning();
}
@After
@@ -249,89 +237,30 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
changeState(d, dLost);
}
- // TODO(William Farner): Consider removing the thread safety tests. Now that intrinsic locks
- // are not used, it is rather awkward to test this.
@Test
- public void testThreadSafeStateChangeEvent() throws Exception {
- // This tests against regression where an executor pruning a task holds an intrinsic lock and
- // an unrelated task state change in the scheduler fires an event that requires this intrinsic
- // lock. This causes a deadlock when the executor tries to acquire a lock held by the event
- // fired.
-
- ScheduledThreadPoolExecutor realExecutor = new ScheduledThreadPoolExecutor(1,
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("testThreadSafeEvents-executor")
- .build());
- closer.register(
- () -> MoreExecutors.shutdownAndAwaitTermination(realExecutor, 1L, TimeUnit.SECONDS));
-
- Injector injector = Guice.createInjector(
- new AsyncModule(realExecutor),
- new AbstractModule() {
- @Override
- protected void configure() {
- bind(StatsProvider.class).toInstance(new FakeStatsProvider());
- }
- });
- executor = injector.getInstance(Key.get(DelayExecutor.class, AsyncExecutor.class));
-
- pruner = buildPruner(executor);
- // The goal is to verify that the call does not deadlock. We do not care about the outcome.
- Command onDeleted = () -> changeState(makeTask("b", ASSIGNED), STARTING);
- CountDownLatch taskDeleted = expectTaskDeleted(onDeleted, TASK_ID);
+ public void serviceShutdownOnFailure() {
+ IScheduledTask running = makeTask("a", RUNNING);
+ IScheduledTask killed = copy(running, KILLED);
+ expectNoImmediatePrune(ImmutableSet.of(running));
+ Capture<Runnable> delayedDelete = expectDefaultDelayedPrune();
- control.replay();
+ expectDeleteTasks("a");
+ expectLastCall().andThrow(new RuntimeException("oops"));
- // Change the task to a terminal state and wait for it to be pruned.
- changeState(makeTask(TASK_ID, RUNNING), KILLED);
- taskDeleted.await();
- }
+ control.replay();
- private TaskHistoryPruner buildPruner(DelayExecutor delayExecutor) {
- return new TaskHistoryPruner(
- delayExecutor,
- stateManager,
- clock,
- new HistoryPrunnerSettings(Amount.of(1L, Time.MILLISECONDS), ONE_MS, PER_JOB_HISTORY),
- storageUtil.storage);
- }
+ changeState(running, killed);
+ clock.advance(ONE_HOUR);
+ delayedDelete.getValue().run();
+ // awaitTerminated throws an IllegalStateException if the service fails
+ try {
+ pruner.awaitTerminated();
+ fail();
+ } catch (IllegalStateException e) {
+ assertEquals(Service.State.FAILED, pruner.state());
+ }
- private CountDownLatch expectTaskDeleted(Command onDelete, String taskId) {
- CountDownLatch deleteCalled = new CountDownLatch(1);
- CountDownLatch eventDelivered = new CountDownLatch(1);
-
- Thread eventDispatch = new Thread() {
- @Override
- public void run() {
- try {
- deleteCalled.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- fail("Interrupted while awaiting for delete call.");
- return;
- }
- onDelete.execute();
- eventDelivered.countDown();
- }
- };
- eventDispatch.setDaemon(true);
- eventDispatch.setName(getClass().getName() + "-EventDispatch");
- eventDispatch.start();
-
- stateManager.deleteTasks(storageUtil.mutableStoreProvider, ImmutableSet.of(taskId));
- expectLastCall().andAnswer(() -> {
- deleteCalled.countDown();
- try {
- eventDelivered.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- fail("Interrupted while awaiting for event delivery.");
- }
- return null;
- });
-
- return eventDelivered;
+ assertNotNull(pruner.failureCause());
}
private void expectDeleteTasks(String... tasks) {
@@ -384,11 +313,6 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
pruner.recordStateChange(TaskStateChange.transition(newStateTask, oldStateTask.getStatus()));
}
- private void changeState(IScheduledTask oldStateTask, ScheduleStatus status) {
- pruner.recordStateChange(
- TaskStateChange.transition(copy(oldStateTask, status), oldStateTask.getStatus()));
- }
-
private IScheduledTask copy(IScheduledTask task, ScheduleStatus status) {
return IScheduledTask.build(task.newBuilder().setStatus(status));
}