You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by zm...@apache.org on 2016/01/22 02:38:35 UTC

aurora git commit: Turn TaskHistoryPruner into a service and trigger shutdown on pruning failure.

Repository: aurora
Updated Branches:
  refs/heads/master a2c7ccc17 -> c89fecbcd


Turn TaskHistoryPruner into a service and trigger shutdown on pruning failure.

Task pruning is key to operating a large cluster and failure to prune should
trigger shutdown to prevent unbounded growth of storage. This patch turns
`TaskHistoryPruner` into a service which propagates failure from failed pruning
attempts towards the `ServiceManager`. Also completing a TODO which removes a
test for behaviour that is very awkward to test for.

Bugs closed: AURORA-1582

Reviewed at https://reviews.apache.org/r/42332/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/c89fecbc
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/c89fecbc
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/c89fecbc

Branch: refs/heads/master
Commit: c89fecbcd93aa6ddcf6af60f2a2cb6315b7a4d19
Parents: a2c7ccc
Author: Zameer Manji <zm...@apache.org>
Authored: Thu Jan 21 17:38:25 2016 -0800
Committer: Zameer Manji <zm...@apache.org>
Committed: Thu Jan 21 17:38:25 2016 -0800

----------------------------------------------------------------------
 .../aurora/scheduler/pruning/PruningModule.java |   1 +
 .../scheduler/pruning/TaskHistoryPruner.java    |  52 ++++++--
 .../pruning/TaskHistoryPrunerTest.java          | 126 ++++---------------
 3 files changed, 68 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/c89fecbc/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java b/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
index 735199a..efdfbda 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java
@@ -83,6 +83,7 @@ public class PruningModule extends AbstractModule {
         expose(TaskHistoryPruner.class);
       }
     });
+    SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(TaskHistoryPruner.class);
     PubsubEventModule.bindSubscriber(binder(), TaskHistoryPruner.class);
 
     install(new PrivateModule() {

http://git-wip-us.apache.org/repos/asf/aurora/blob/c89fecbc/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
index 2064089..2d4c58e 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
@@ -14,6 +14,9 @@
 package org.apache.aurora.scheduler.pruning;
 
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
 
 import javax.inject.Inject;
 
@@ -22,7 +25,9 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Queues;
 import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractScheduledService;
 
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
@@ -42,6 +47,8 @@ import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.requireNonNull;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import static org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 
@@ -49,7 +56,7 @@ import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
  * Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks
  * transitioning into one of the inactive states.
  */
-public class TaskHistoryPruner implements EventSubscriber {
+public class TaskHistoryPruner extends AbstractScheduledService implements EventSubscriber {
   private static final Logger LOG = LoggerFactory.getLogger(TaskHistoryPruner.class);
 
   private final DelayExecutor executor;
@@ -57,6 +64,7 @@ public class TaskHistoryPruner implements EventSubscriber {
   private final Clock clock;
   private final HistoryPrunnerSettings settings;
   private final Storage storage;
+  private final ConcurrentLinkedQueue<FutureTask<Void>> futureTasks;
 
   private final Predicate<IScheduledTask> safeToDelete = new Predicate<IScheduledTask>() {
     @Override
@@ -95,6 +103,7 @@ public class TaskHistoryPruner implements EventSubscriber {
     this.clock = requireNonNull(clock);
     this.settings = requireNonNull(settings);
     this.storage = requireNonNull(storage);
+    this.futureTasks = Queues.newConcurrentLinkedQueue();
   }
 
   @VisibleForTesting
@@ -111,6 +120,8 @@ public class TaskHistoryPruner implements EventSubscriber {
    */
   @Subscribe
   public void recordStateChange(TaskStateChange change) {
+    checkState(isRunning());
+
     if (Tasks.isTerminated(change.getNewState())) {
       long timeoutBasis = change.isTransition()
           ? clock.nowMillis()
@@ -122,6 +133,22 @@ public class TaskHistoryPruner implements EventSubscriber {
     }
   }
 
+  @Override
+  protected void runOneIteration() throws Exception {
+    // Check if the prune attempts fail and propagate the exception. This will trigger
+    // service (and the scheduler) to shut down.
+    FutureTask<Void> future;
+
+    while ((future = futureTasks.poll()) != null) {
+      future.get();
+    }
+  }
+
+  @Override
+  protected Scheduler scheduler() {
+    return AbstractScheduledService.Scheduler.newFixedDelaySchedule(0, 5, TimeUnit.SECONDS);
+  }
+
   private void deleteTasks(final Set<String> taskIds) {
     LOG.info("Pruning inactive tasks " + taskIds);
     storage.write(
@@ -139,14 +166,16 @@ public class TaskHistoryPruner implements EventSubscriber {
       long timeRemaining) {
 
     LOG.debug("Prune task " + taskId + " in " + timeRemaining + " ms.");
-    executor.execute(
-        () -> {
-          LOG.info("Pruning expired inactive task " + taskId);
-          deleteTasks(ImmutableSet.of(taskId));
-        },
-        Amount.of(timeRemaining, Time.MILLISECONDS));
-
-    executor.execute(() -> {
+
+    FutureTask<Void> pruneSingleTask = new FutureTask<>(() -> {
+      LOG.info("Pruning expired inactive task " + taskId);
+      deleteTasks(ImmutableSet.of(taskId));
+    }, null);
+    futureTasks.add(pruneSingleTask);
+
+    executor.execute(pruneSingleTask, Amount.of(timeRemaining, Time.MILLISECONDS));
+
+    FutureTask<Void> pruneRemainingTasksFromJob = new FutureTask<>(() -> {
       Iterable<IScheduledTask> inactiveTasks =
           Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey));
       int numInactiveTasks = Iterables.size(inactiveTasks);
@@ -162,6 +191,9 @@ public class TaskHistoryPruner implements EventSubscriber {
           deleteTasks(toPrune);
         }
       }
-    });
+    }, null);
+    futureTasks.add(pruneRemainingTasksFromJob);
+
+    executor.execute(pruneRemainingTasksFromJob);
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/c89fecbc/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
index 295960f..e1b5391 100644
--- a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
@@ -13,26 +13,15 @@
  */
 package org.apache.aurora.scheduler.pruning;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.io.Closer;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Key;
-
-import org.apache.aurora.common.base.Command;
+import com.google.common.util.concurrent.Service;
+
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.common.util.testing.FakeClock;
 import org.apache.aurora.gen.AssignedTask;
@@ -43,8 +32,6 @@ import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
 import org.apache.aurora.gen.TaskEvent;
-import org.apache.aurora.scheduler.async.AsyncModule;
-import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
 import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -53,14 +40,12 @@ import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.aurora.gen.ScheduleStatus.ASSIGNED;
 import static org.apache.aurora.gen.ScheduleStatus.FINISHED;
 import static org.apache.aurora.gen.ScheduleStatus.KILLED;
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
@@ -68,11 +53,12 @@ import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.STARTING;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
 public class TaskHistoryPrunerTest extends EasyMockTest {
   private static final String JOB_A = "job-a";
-  private static final String TASK_ID = "task_id";
   private static final String SLAVE_HOST = "HOST_A";
   private static final Amount<Long, Time> ONE_MS = Amount.of(1L, Time.MILLISECONDS);
   private static final Amount<Long, Time> ONE_MINUTE = Amount.of(1L, Time.MINUTES);
@@ -101,6 +87,8 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
         new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY),
         storageUtil.storage);
     closer = Closer.create();
+
+    pruner.startAsync().awaitRunning();
   }
 
   @After
@@ -249,89 +237,30 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
     changeState(d, dLost);
   }
 
-  // TODO(William Farner): Consider removing the thread safety tests.  Now that intrinsic locks
-  // are not used, it is rather awkward to test this.
   @Test
-  public void testThreadSafeStateChangeEvent() throws Exception {
-    // This tests against regression where an executor pruning a task holds an intrinsic lock and
-    // an unrelated task state change in the scheduler fires an event that requires this intrinsic
-    // lock. This causes a deadlock when the executor tries to acquire a lock held by the event
-    // fired.
-
-    ScheduledThreadPoolExecutor realExecutor = new ScheduledThreadPoolExecutor(1,
-        new ThreadFactoryBuilder()
-            .setDaemon(true)
-            .setNameFormat("testThreadSafeEvents-executor")
-            .build());
-    closer.register(
-        () -> MoreExecutors.shutdownAndAwaitTermination(realExecutor, 1L, TimeUnit.SECONDS));
-
-    Injector injector = Guice.createInjector(
-        new AsyncModule(realExecutor),
-        new AbstractModule() {
-          @Override
-          protected void configure() {
-            bind(StatsProvider.class).toInstance(new FakeStatsProvider());
-          }
-        });
-    executor = injector.getInstance(Key.get(DelayExecutor.class, AsyncExecutor.class));
-
-    pruner = buildPruner(executor);
-    // The goal is to verify that the call does not deadlock. We do not care about the outcome.
-    Command onDeleted = () -> changeState(makeTask("b", ASSIGNED), STARTING);
-    CountDownLatch taskDeleted = expectTaskDeleted(onDeleted, TASK_ID);
+  public void serviceShutdownOnFailure() {
+    IScheduledTask running = makeTask("a", RUNNING);
+    IScheduledTask killed = copy(running, KILLED);
+    expectNoImmediatePrune(ImmutableSet.of(running));
+    Capture<Runnable> delayedDelete = expectDefaultDelayedPrune();
 
-    control.replay();
+    expectDeleteTasks("a");
+    expectLastCall().andThrow(new RuntimeException("oops"));
 
-    // Change the task to a terminal state and wait for it to be pruned.
-    changeState(makeTask(TASK_ID, RUNNING), KILLED);
-    taskDeleted.await();
-  }
+    control.replay();
 
-  private TaskHistoryPruner buildPruner(DelayExecutor delayExecutor) {
-    return new TaskHistoryPruner(
-        delayExecutor,
-        stateManager,
-        clock,
-        new HistoryPrunnerSettings(Amount.of(1L, Time.MILLISECONDS), ONE_MS, PER_JOB_HISTORY),
-        storageUtil.storage);
-  }
+    changeState(running, killed);
+    clock.advance(ONE_HOUR);
+    delayedDelete.getValue().run();
+    // awaitTerminated throws an IllegalStateException if the service fails
+    try {
+      pruner.awaitTerminated();
+      fail();
+    } catch (IllegalStateException e) {
+      assertEquals(Service.State.FAILED, pruner.state());
+    }
 
-  private CountDownLatch expectTaskDeleted(Command onDelete, String taskId) {
-    CountDownLatch deleteCalled = new CountDownLatch(1);
-    CountDownLatch eventDelivered = new CountDownLatch(1);
-
-    Thread eventDispatch = new Thread() {
-      @Override
-      public void run() {
-        try {
-          deleteCalled.await();
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          fail("Interrupted while awaiting for delete call.");
-          return;
-        }
-        onDelete.execute();
-        eventDelivered.countDown();
-      }
-    };
-    eventDispatch.setDaemon(true);
-    eventDispatch.setName(getClass().getName() + "-EventDispatch");
-    eventDispatch.start();
-
-    stateManager.deleteTasks(storageUtil.mutableStoreProvider, ImmutableSet.of(taskId));
-    expectLastCall().andAnswer(() -> {
-      deleteCalled.countDown();
-      try {
-        eventDelivered.await();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        fail("Interrupted while awaiting for event delivery.");
-      }
-      return null;
-    });
-
-    return eventDelivered;
+    assertNotNull(pruner.failureCause());
   }
 
   private void expectDeleteTasks(String... tasks) {
@@ -384,11 +313,6 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
     pruner.recordStateChange(TaskStateChange.transition(newStateTask, oldStateTask.getStatus()));
   }
 
-  private void changeState(IScheduledTask oldStateTask, ScheduleStatus status) {
-    pruner.recordStateChange(
-        TaskStateChange.transition(copy(oldStateTask, status), oldStateTask.getStatus()));
-  }
-
   private IScheduledTask copy(IScheduledTask task, ScheduleStatus status) {
     return IScheduledTask.build(task.newBuilder().setStatus(status));
   }