You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by js...@apache.org on 2016/01/22 22:50:56 UTC

aurora git commit: Simplify TaskHistoryPruner tie-in to Lifecycle.

Repository: aurora
Updated Branches:
  refs/heads/master 2da17009c -> eae686023


Simplify TaskHistoryPruner tie-in to Lifecycle.

This eliminates processing all futures to find the 1st failed one in
favor of directly signalling a Service failure when a unit of async work
fails.

Testing Done:
Locally green: `./gradlew -P build`.

Bugs closed: AURORA-1582

Reviewed at https://reviews.apache.org/r/42639/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/eae68602
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/eae68602
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/eae68602

Branch: refs/heads/master
Commit: eae6860232a1ce10615023d0833c269bb5e85356
Parents: 2da1700
Author: John Sirois <js...@apache.org>
Authored: Fri Jan 22 14:50:54 2016 -0700
Committer: John Sirois <js...@apache.org>
Committed: Fri Jan 22 14:50:54 2016 -0700

----------------------------------------------------------------------
 src/main/java/org/apache/aurora/GuavaUtils.java | 18 +++++++
 .../scheduler/pruning/TaskHistoryPruner.java    | 56 +++++++-------------
 .../aurora/LifecycleShutdownListenerTest.java   | 17 +-----
 3 files changed, 40 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/eae68602/src/main/java/org/apache/aurora/GuavaUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/GuavaUtils.java b/src/main/java/org/apache/aurora/GuavaUtils.java
index 8c2ab57..7d569e0 100644
--- a/src/main/java/org/apache/aurora/GuavaUtils.java
+++ b/src/main/java/org/apache/aurora/GuavaUtils.java
@@ -20,6 +20,7 @@ import java.util.stream.Collector;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.AbstractService;
 import com.google.common.util.concurrent.Service;
 import com.google.common.util.concurrent.Service.State;
 import com.google.common.util.concurrent.ServiceManager;
@@ -56,6 +57,23 @@ public final class GuavaUtils {
   }
 
   /**
+   * A Service that does nothing; useful for building passive services driven by an external
+   * event loop.
+   */
+  public static class PassiveService extends AbstractService {
+
+    @Override
+    protected void doStart() {
+      notifyStarted();
+    }
+
+    @Override
+    protected void doStop() {
+      notifyStopped();
+    }
+  }
+
+  /**
    * Collector to create a Guava ImmutableSet.
    */
   public static <T> Collector<T, ?, ImmutableSet<T>> toImmutableSet() {

http://git-wip-us.apache.org/repos/asf/aurora/blob/eae68602/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
index 2d4c58e..5441630 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
@@ -14,9 +14,6 @@
 package org.apache.aurora.scheduler.pruning;
 
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.TimeUnit;
 
 import javax.inject.Inject;
 
@@ -25,10 +22,9 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Queues;
 import com.google.common.eventbus.Subscribe;
-import com.google.common.util.concurrent.AbstractScheduledService;
 
+import org.apache.aurora.GuavaUtils.PassiveService;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.util.Clock;
@@ -56,7 +52,7 @@ import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
  * Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks
  * transitioning into one of the inactive states.
  */
-public class TaskHistoryPruner extends AbstractScheduledService implements EventSubscriber {
+public class TaskHistoryPruner extends PassiveService implements EventSubscriber {
   private static final Logger LOG = LoggerFactory.getLogger(TaskHistoryPruner.class);
 
   private final DelayExecutor executor;
@@ -64,7 +60,6 @@ public class TaskHistoryPruner extends AbstractScheduledService implements Event
   private final Clock clock;
   private final HistoryPrunnerSettings settings;
   private final Storage storage;
-  private final ConcurrentLinkedQueue<FutureTask<Void>> futureTasks;
 
   private final Predicate<IScheduledTask> safeToDelete = new Predicate<IScheduledTask>() {
     @Override
@@ -103,7 +98,6 @@ public class TaskHistoryPruner extends AbstractScheduledService implements Event
     this.clock = requireNonNull(clock);
     this.settings = requireNonNull(settings);
     this.storage = requireNonNull(storage);
-    this.futureTasks = Queues.newConcurrentLinkedQueue();
   }
 
   @VisibleForTesting
@@ -133,22 +127,6 @@ public class TaskHistoryPruner extends AbstractScheduledService implements Event
     }
   }
 
-  @Override
-  protected void runOneIteration() throws Exception {
-    // Check if the prune attempts fail and propagate the exception. This will trigger
-    // service (and the scheduler) to shut down.
-    FutureTask<Void> future;
-
-    while ((future = futureTasks.poll()) != null) {
-      future.get();
-    }
-  }
-
-  @Override
-  protected Scheduler scheduler() {
-    return AbstractScheduledService.Scheduler.newFixedDelaySchedule(0, 5, TimeUnit.SECONDS);
-  }
-
   private void deleteTasks(final Set<String> taskIds) {
     LOG.info("Pruning inactive tasks " + taskIds);
     storage.write(
@@ -160,6 +138,16 @@ public class TaskHistoryPruner extends AbstractScheduledService implements Event
     return Query.jobScoped(jobKey).byStatus(apiConstants.TERMINAL_STATES);
   }
 
+  private Runnable failureNotifyingRunnable(Runnable runnable) {
+    return () -> {
+      try {
+        runnable.run();
+      } catch (Throwable t) {
+        notifyFailed(t);
+      }
+    };
+  }
+
   private void registerInactiveTask(
       final IJobKey jobKey,
       final String taskId,
@@ -167,15 +155,14 @@ public class TaskHistoryPruner extends AbstractScheduledService implements Event
 
     LOG.debug("Prune task " + taskId + " in " + timeRemaining + " ms.");
 
-    FutureTask<Void> pruneSingleTask = new FutureTask<>(() -> {
-      LOG.info("Pruning expired inactive task " + taskId);
-      deleteTasks(ImmutableSet.of(taskId));
-    }, null);
-    futureTasks.add(pruneSingleTask);
+    executor.execute(
+        failureNotifyingRunnable(() -> {
+          LOG.info("Pruning expired inactive task " + taskId);
+          deleteTasks(ImmutableSet.of(taskId));
+        }),
+        Amount.of(timeRemaining, Time.MILLISECONDS));
 
-    executor.execute(pruneSingleTask, Amount.of(timeRemaining, Time.MILLISECONDS));
-
-    FutureTask<Void> pruneRemainingTasksFromJob = new FutureTask<>(() -> {
+    executor.execute(failureNotifyingRunnable(() -> {
       Iterable<IScheduledTask> inactiveTasks =
           Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey));
       int numInactiveTasks = Iterables.size(inactiveTasks);
@@ -191,9 +178,6 @@ public class TaskHistoryPruner extends AbstractScheduledService implements Event
           deleteTasks(toPrune);
         }
       }
-    }, null);
-    futureTasks.add(pruneRemainingTasksFromJob);
-
-    executor.execute(pruneRemainingTasksFromJob);
+    }));
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/eae68602/src/test/java/org/apache/aurora/LifecycleShutdownListenerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/LifecycleShutdownListenerTest.java b/src/test/java/org/apache/aurora/LifecycleShutdownListenerTest.java
index 8d19c04..d91dc27 100644
--- a/src/test/java/org/apache/aurora/LifecycleShutdownListenerTest.java
+++ b/src/test/java/org/apache/aurora/LifecycleShutdownListenerTest.java
@@ -13,11 +13,10 @@
  */
 package org.apache.aurora;
 
-import com.google.common.util.concurrent.AbstractService;
-import com.google.common.util.concurrent.Service;
 import com.google.common.util.concurrent.ServiceManager;
 
 import org.apache.aurora.GuavaUtils.LifecycleShutdownListener;
+import org.apache.aurora.GuavaUtils.PassiveService;
 import org.apache.aurora.common.application.Lifecycle;
 import org.apache.aurora.common.base.Command;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
@@ -26,18 +25,6 @@ import org.junit.Test;
 
 public class LifecycleShutdownListenerTest extends EasyMockTest {
 
-  private static final Service NOOP_SERVICE = new AbstractService() {
-    @Override
-    protected void doStart() {
-      // Noop.
-    }
-
-    @Override
-    protected void doStop() {
-      // Noop.
-    }
-  };
-
   private Command shutdown;
   private ServiceManager.Listener listener;
 
@@ -53,6 +40,6 @@ public class LifecycleShutdownListenerTest extends EasyMockTest {
 
     control.replay();
 
-    listener.failure(NOOP_SERVICE);
+    listener.failure(new PassiveService());
   }
 }