You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by js...@apache.org on 2016/01/22 22:50:56 UTC
aurora git commit: Simplify TaskHistoryPruner tie-in to Lifecycle.
Repository: aurora
Updated Branches:
refs/heads/master 2da17009c -> eae686023
Simplify TaskHistoryPruner tie-in to Lifecycle.
This eliminates processing all futures to find the 1st failed one in
favor of directly signalling a Service failure when a unit of async work
fails.
Testing Done:
Locally green: `./gradlew -P build`.
Bugs closed: AURORA-1582
Reviewed at https://reviews.apache.org/r/42639/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/eae68602
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/eae68602
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/eae68602
Branch: refs/heads/master
Commit: eae6860232a1ce10615023d0833c269bb5e85356
Parents: 2da1700
Author: John Sirois <js...@apache.org>
Authored: Fri Jan 22 14:50:54 2016 -0700
Committer: John Sirois <js...@apache.org>
Committed: Fri Jan 22 14:50:54 2016 -0700
----------------------------------------------------------------------
src/main/java/org/apache/aurora/GuavaUtils.java | 18 +++++++
.../scheduler/pruning/TaskHistoryPruner.java | 56 +++++++-------------
.../aurora/LifecycleShutdownListenerTest.java | 17 +-----
3 files changed, 40 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/eae68602/src/main/java/org/apache/aurora/GuavaUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/GuavaUtils.java b/src/main/java/org/apache/aurora/GuavaUtils.java
index 8c2ab57..7d569e0 100644
--- a/src/main/java/org/apache/aurora/GuavaUtils.java
+++ b/src/main/java/org/apache/aurora/GuavaUtils.java
@@ -20,6 +20,7 @@ import java.util.stream.Collector;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.Service.State;
import com.google.common.util.concurrent.ServiceManager;
@@ -56,6 +57,23 @@ public final class GuavaUtils {
}
/**
+ * A Service that does nothing; useful for building passive services driven by an external
+ * event loop.
+ */
+ public static class PassiveService extends AbstractService {
+
+ @Override
+ protected void doStart() {
+ notifyStarted();
+ }
+
+ @Override
+ protected void doStop() {
+ notifyStopped();
+ }
+ }
+
+ /**
* Collector to create a Guava ImmutableSet.
*/
public static <T> Collector<T, ?, ImmutableSet<T>> toImmutableSet() {
http://git-wip-us.apache.org/repos/asf/aurora/blob/eae68602/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
index 2d4c58e..5441630 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
@@ -14,9 +14,6 @@
package org.apache.aurora.scheduler.pruning;
import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
@@ -25,10 +22,9 @@ import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Queues;
import com.google.common.eventbus.Subscribe;
-import com.google.common.util.concurrent.AbstractScheduledService;
+import org.apache.aurora.GuavaUtils.PassiveService;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.util.Clock;
@@ -56,7 +52,7 @@ import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
* Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks
* transitioning into one of the inactive states.
*/
-public class TaskHistoryPruner extends AbstractScheduledService implements EventSubscriber {
+public class TaskHistoryPruner extends PassiveService implements EventSubscriber {
private static final Logger LOG = LoggerFactory.getLogger(TaskHistoryPruner.class);
private final DelayExecutor executor;
@@ -64,7 +60,6 @@ public class TaskHistoryPruner extends AbstractScheduledService implements Event
private final Clock clock;
private final HistoryPrunnerSettings settings;
private final Storage storage;
- private final ConcurrentLinkedQueue<FutureTask<Void>> futureTasks;
private final Predicate<IScheduledTask> safeToDelete = new Predicate<IScheduledTask>() {
@Override
@@ -103,7 +98,6 @@ public class TaskHistoryPruner extends AbstractScheduledService implements Event
this.clock = requireNonNull(clock);
this.settings = requireNonNull(settings);
this.storage = requireNonNull(storage);
- this.futureTasks = Queues.newConcurrentLinkedQueue();
}
@VisibleForTesting
@@ -133,22 +127,6 @@ public class TaskHistoryPruner extends AbstractScheduledService implements Event
}
}
- @Override
- protected void runOneIteration() throws Exception {
- // Check if the prune attempts fail and propagate the exception. This will trigger
- // service (and the scheduler) to shut down.
- FutureTask<Void> future;
-
- while ((future = futureTasks.poll()) != null) {
- future.get();
- }
- }
-
- @Override
- protected Scheduler scheduler() {
- return AbstractScheduledService.Scheduler.newFixedDelaySchedule(0, 5, TimeUnit.SECONDS);
- }
-
private void deleteTasks(final Set<String> taskIds) {
LOG.info("Pruning inactive tasks " + taskIds);
storage.write(
@@ -160,6 +138,16 @@ public class TaskHistoryPruner extends AbstractScheduledService implements Event
return Query.jobScoped(jobKey).byStatus(apiConstants.TERMINAL_STATES);
}
+ private Runnable failureNotifyingRunnable(Runnable runnable) {
+ return () -> {
+ try {
+ runnable.run();
+ } catch (Throwable t) {
+ notifyFailed(t);
+ }
+ };
+ }
+
private void registerInactiveTask(
final IJobKey jobKey,
final String taskId,
@@ -167,15 +155,14 @@ public class TaskHistoryPruner extends AbstractScheduledService implements Event
LOG.debug("Prune task " + taskId + " in " + timeRemaining + " ms.");
- FutureTask<Void> pruneSingleTask = new FutureTask<>(() -> {
- LOG.info("Pruning expired inactive task " + taskId);
- deleteTasks(ImmutableSet.of(taskId));
- }, null);
- futureTasks.add(pruneSingleTask);
+ executor.execute(
+ failureNotifyingRunnable(() -> {
+ LOG.info("Pruning expired inactive task " + taskId);
+ deleteTasks(ImmutableSet.of(taskId));
+ }),
+ Amount.of(timeRemaining, Time.MILLISECONDS));
- executor.execute(pruneSingleTask, Amount.of(timeRemaining, Time.MILLISECONDS));
-
- FutureTask<Void> pruneRemainingTasksFromJob = new FutureTask<>(() -> {
+ executor.execute(failureNotifyingRunnable(() -> {
Iterable<IScheduledTask> inactiveTasks =
Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey));
int numInactiveTasks = Iterables.size(inactiveTasks);
@@ -191,9 +178,6 @@ public class TaskHistoryPruner extends AbstractScheduledService implements Event
deleteTasks(toPrune);
}
}
- }, null);
- futureTasks.add(pruneRemainingTasksFromJob);
-
- executor.execute(pruneRemainingTasksFromJob);
+ }));
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/eae68602/src/test/java/org/apache/aurora/LifecycleShutdownListenerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/LifecycleShutdownListenerTest.java b/src/test/java/org/apache/aurora/LifecycleShutdownListenerTest.java
index 8d19c04..d91dc27 100644
--- a/src/test/java/org/apache/aurora/LifecycleShutdownListenerTest.java
+++ b/src/test/java/org/apache/aurora/LifecycleShutdownListenerTest.java
@@ -13,11 +13,10 @@
*/
package org.apache.aurora;
-import com.google.common.util.concurrent.AbstractService;
-import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.ServiceManager;
import org.apache.aurora.GuavaUtils.LifecycleShutdownListener;
+import org.apache.aurora.GuavaUtils.PassiveService;
import org.apache.aurora.common.application.Lifecycle;
import org.apache.aurora.common.base.Command;
import org.apache.aurora.common.testing.easymock.EasyMockTest;
@@ -26,18 +25,6 @@ import org.junit.Test;
public class LifecycleShutdownListenerTest extends EasyMockTest {
- private static final Service NOOP_SERVICE = new AbstractService() {
- @Override
- protected void doStart() {
- // Noop.
- }
-
- @Override
- protected void doStop() {
- // Noop.
- }
- };
-
private Command shutdown;
private ServiceManager.Listener listener;
@@ -53,6 +40,6 @@ public class LifecycleShutdownListenerTest extends EasyMockTest {
control.replay();
- listener.failure(NOOP_SERVICE);
+ listener.failure(new PassiveService());
}
}