You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/03/10 21:32:45 UTC
aurora git commit: Log and terminate scheduler on updater thread
failure.
Repository: aurora
Updated Branches:
refs/heads/master c5f94e05f -> 72bf8dbd2
Log and terminate scheduler on updater thread failure.
Bugs closed: AURORA-1630
Reviewed at https://reviews.apache.org/r/44493/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/72bf8dbd
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/72bf8dbd
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/72bf8dbd
Branch: refs/heads/master
Commit: 72bf8dbd20221806806e964870f6f592fb6713f8
Parents: c5f94e0
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Thu Mar 10 12:32:38 2016 -0800
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Thu Mar 10 12:32:38 2016 -0800
----------------------------------------------------------------------
.../apache/aurora/scheduler/base/AsyncUtil.java | 31 ++++++++
.../scheduler/pruning/TaskHistoryPruner.java | 67 ++++++++--------
.../updater/JobUpdateControllerImpl.java | 82 ++++++++++++--------
.../aurora/scheduler/updater/UpdaterModule.java | 9 ++-
.../aurora/scheduler/updater/JobUpdaterIT.java | 41 ++++++++++
5 files changed, 159 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/72bf8dbd/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java b/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java
index 80dc35e..474b6e0 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.aurora.common.application.Lifecycle;
import org.apache.aurora.common.stats.Stats;
import org.slf4j.Logger;
@@ -112,6 +113,36 @@ public final class AsyncUtil {
};
}
+ /**
+ * Helper wrapper to call the provided {@link Lifecycle} on unhandled error.
+ *
+ * @param lifecycle {@link Lifecycle} instance.
+ * @param logger Logger instance.
+ * @param message message to log.
+ * @param runnable {@link Runnable} to wrap.
+ * @return A new {@link Runnable} logging an error and calling {@link Lifecycle#shutdown()}.
+ */
+ public static Runnable shutdownOnError(
+ Lifecycle lifecycle,
+ Logger logger,
+ String message,
+ Runnable runnable) {
+
+ requireNonNull(lifecycle);
+ requireNonNull(logger);
+ requireNonNull(message);
+ requireNonNull(runnable);
+
+ return () -> {
+ try {
+ runnable.run();
+ } catch (Throwable t) {
+ logger.error(message, t);
+ lifecycle.shutdown();
+ }
+ };
+ }
+
private static void evaluateResult(Runnable runnable, Throwable throwable, Logger logger) {
// See java.util.concurrent.ThreadPoolExecutor#afterExecute(Runnable, Throwable)
// for more details and an implementation example.
http://git-wip-us.apache.org/repos/asf/aurora/blob/72bf8dbd/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
index 22753b4..f07746c 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
+import static org.apache.aurora.scheduler.base.AsyncUtil.shutdownOnError;
import static org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -52,6 +53,8 @@ import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
*/
public class TaskHistoryPruner implements EventSubscriber {
private static final Logger LOG = LoggerFactory.getLogger(TaskHistoryPruner.class);
+ private static final String FATAL_ERROR_FORMAT =
+ "Unexpected problem pruning task history for %s. Triggering shutdown";
private final DelayExecutor executor;
private final StateManager stateManager;
@@ -137,19 +140,6 @@ public class TaskHistoryPruner implements EventSubscriber {
return Query.jobScoped(jobKey).byStatus(apiConstants.TERMINAL_STATES);
}
- private Runnable shutdownOnError(String subject, Runnable runnable) {
- return () -> {
- try {
- runnable.run();
- } catch (Throwable t) {
- LOG.error(
- "Unexpected problem pruning task history for " + subject + ". Triggering shutdown",
- t);
- lifecycle.shutdown();
- }
- };
- }
-
private void registerInactiveTask(
final IJobKey jobKey,
final String taskId,
@@ -158,28 +148,37 @@ public class TaskHistoryPruner implements EventSubscriber {
LOG.debug("Prune task " + taskId + " in " + timeRemaining + " ms.");
executor.execute(
- shutdownOnError("task: " + taskId, () -> {
- LOG.info("Pruning expired inactive task " + taskId);
- deleteTasks(ImmutableSet.of(taskId));
- }),
+ shutdownOnError(
+ lifecycle,
+ LOG,
+ String.format(FATAL_ERROR_FORMAT, "task: " + taskId),
+ () -> {
+ LOG.info("Pruning expired inactive task " + taskId);
+ deleteTasks(ImmutableSet.of(taskId));
+ }),
Amount.of(timeRemaining, Time.MILLISECONDS));
- executor.execute(shutdownOnError("job: " + jobKey, () -> {
- Iterable<IScheduledTask> inactiveTasks =
- Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey));
- int numInactiveTasks = Iterables.size(inactiveTasks);
- int tasksToPrune = numInactiveTasks - settings.perJobHistoryGoal;
- if (tasksToPrune > 0 && numInactiveTasks > settings.perJobHistoryGoal) {
- Set<String> toPrune = FluentIterable
- .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks))
- .filter(safeToDelete)
- .limit(tasksToPrune)
- .transform(Tasks::id)
- .toSet();
- if (!toPrune.isEmpty()) {
- deleteTasks(toPrune);
- }
- }
- }));
+ executor.execute(
+ shutdownOnError(
+ lifecycle,
+ LOG,
+ String.format(FATAL_ERROR_FORMAT, "job: " + jobKey),
+ () -> {
+ Iterable<IScheduledTask> inactiveTasks =
+ Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey));
+ int numInactiveTasks = Iterables.size(inactiveTasks);
+ int tasksToPrune = numInactiveTasks - settings.perJobHistoryGoal;
+ if (tasksToPrune > 0 && numInactiveTasks > settings.perJobHistoryGoal) {
+ Set<String> toPrune = FluentIterable
+ .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks))
+ .filter(safeToDelete)
+ .limit(tasksToPrune)
+ .transform(Tasks::id)
+ .toSet();
+ if (!toPrune.isEmpty()) {
+ deleteTasks(toPrune);
+ }
+ }
+ }));
}
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/72bf8dbd/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index 48d7e2a..364c5c7 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -31,6 +31,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
+import org.apache.aurora.common.application.Lifecycle;
import org.apache.aurora.common.collections.Pair;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
@@ -81,6 +82,7 @@ import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_FORWARD;
import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE;
+import static org.apache.aurora.scheduler.base.AsyncUtil.shutdownOnError;
import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.ACTIVE_QUERY;
import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.AUTO_RESUME_STATES;
@@ -107,6 +109,8 @@ import static org.apache.aurora.scheduler.updater.SideEffect.InstanceUpdateStatu
*/
class JobUpdateControllerImpl implements JobUpdateController {
private static final Logger LOG = LoggerFactory.getLogger(JobUpdateControllerImpl.class);
+ private static final String FATAL_ERROR_FORMAT =
+ "Unexpected problem running asynchronous updater for: %s. Triggering shutdown";
private final UpdateFactory updateFactory;
private final LockManager lockManager;
@@ -115,6 +119,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
private final StateManager stateManager;
private final Clock clock;
private final PulseHandler pulseHandler;
+ private final Lifecycle lifecycle;
// Currently-active updaters. An active updater is one that is rolling forward or back. Paused
// and completed updates are represented only in storage, not here.
@@ -128,7 +133,8 @@ class JobUpdateControllerImpl implements JobUpdateController {
Storage storage,
ScheduledExecutorService executor,
StateManager stateManager,
- Clock clock) {
+ Clock clock,
+ Lifecycle lifecycle) {
this.updateFactory = requireNonNull(updateFactory);
this.lockManager = requireNonNull(lockManager);
@@ -136,6 +142,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
this.executor = requireNonNull(executor);
this.stateManager = requireNonNull(stateManager);
this.clock = requireNonNull(clock);
+ this.lifecycle = requireNonNull(lifecycle);
this.pulseHandler = new PulseHandler(clock);
}
@@ -289,15 +296,19 @@ class JobUpdateControllerImpl implements JobUpdateController {
if (JobUpdateStateMachine.isAwaitingPulse(state.getStatus())) {
// Attempt to unblock a job update previously blocked on expired pulse.
- executor.execute(() -> {
- try {
- unscopedChangeUpdateStatus(
- key,
- status -> new JobUpdateEvent().setStatus(GET_UNBLOCKED_STATE.apply(status)));
- } catch (UpdateStateException e) {
- LOG.error("Error while processing job update pulse: " + e);
- }
- });
+ executor.execute(shutdownOnError(
+ lifecycle,
+ LOG,
+ String.format(FATAL_ERROR_FORMAT, key),
+ () -> {
+ try {
+ unscopedChangeUpdateStatus(
+ key,
+ status -> new JobUpdateEvent().setStatus(GET_UNBLOCKED_STATE.apply(status)));
+ } catch (UpdateStateException e) {
+ LOG.error(String.format("Error processing job update pulse for %s: %s", key, e));
+ }
+ }));
}
return JobUpdatePulseStatus.OK;
@@ -694,29 +705,34 @@ class JobUpdateControllerImpl implements JobUpdateController {
}
private Runnable getDeferredEvaluator(final IInstanceKey instance, final IJobUpdateKey key) {
- return () -> storage.write((NoResult.Quiet) storeProvider -> {
- IJobUpdateSummary summary =
- getOnlyMatch(storeProvider.getJobUpdateStore(), queryByUpdate(key));
- JobUpdateStatus status = summary.getState().getStatus();
- // Suppress this evaluation if the updater is not currently active.
- if (JobUpdateStateMachine.isActive(status)) {
- UpdateFactory.Update update = updates.get(instance.getJobKey());
- try {
- evaluateUpdater(
- storeProvider,
- update,
- summary,
- ImmutableMap.of(
- instance.getInstanceId(),
- getActiveInstance(
- storeProvider.getTaskStore(),
- instance.getJobKey(),
- instance.getInstanceId())));
- } catch (UpdateStateException e) {
- throw Throwables.propagate(e);
- }
- }
- });
+ return shutdownOnError(
+ lifecycle,
+ LOG,
+ String.format(FATAL_ERROR_FORMAT, "Key: " + key + " Instance key: " + instance),
+ () -> storage.write((NoResult.Quiet) storeProvider -> {
+ IJobUpdateSummary summary =
+ getOnlyMatch(storeProvider.getJobUpdateStore(), queryByUpdate(key));
+ JobUpdateStatus status = summary.getState().getStatus();
+ // Suppress this evaluation if the updater is not currently active.
+ if (JobUpdateStateMachine.isActive(status)) {
+ UpdateFactory.Update update = updates.get(instance.getJobKey());
+ try {
+ evaluateUpdater(
+ storeProvider,
+ update,
+ summary,
+ ImmutableMap.of(
+ instance.getInstanceId(),
+ getActiveInstance(
+ storeProvider.getTaskStore(),
+ instance.getJobKey(),
+ instance.getInstanceId())));
+ } catch (UpdateStateException e) {
+ LOG.error(String.format("Error running deferred evaluation for %s: %s", instance, e));
+ Throwables.propagate(e);
+ }
+ }
+ }));
}
private static class PulseHandler {
http://git-wip-us.apache.org/repos/asf/aurora/blob/72bf8dbd/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
index c0472d7..13cbdad 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
@@ -14,29 +14,30 @@
package org.apache.aurora.scheduler.updater;
import java.util.Objects;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import javax.inject.Singleton;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.AbstractModule;
import com.google.inject.PrivateModule;
import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.base.AsyncUtil;
import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Binding module for scheduling logic and higher-level state management.
*/
public class UpdaterModule extends AbstractModule {
+ private static final Logger LOG = LoggerFactory.getLogger(UpdaterModule.class);
private final ScheduledExecutorService executor;
public UpdaterModule() {
- this(Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("updater-%d").build()));
+ this(AsyncUtil.singleThreadLoggingScheduledExecutor("updater-%d", LOG));
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/aurora/blob/72bf8dbd/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index b39e388..cc88915 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -34,6 +34,8 @@ import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
+import org.apache.aurora.common.application.Lifecycle;
+import org.apache.aurora.common.base.Command;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.stats.Stats;
@@ -149,6 +151,7 @@ public class JobUpdaterIT extends EasyMockTest {
private LockManager lockManager;
private StateManager stateManager;
private JobUpdateEventSubscriber subscriber;
+ private Command shutdownCommand;
private static ITaskConfig setExecutorData(ITaskConfig task, String executorData) {
TaskConfig builder = task.newBuilder();
@@ -163,6 +166,7 @@ public class JobUpdaterIT extends EasyMockTest {
ScheduledExecutorService executor = createMock(ScheduledExecutorService.class);
clock = FakeScheduledExecutor.scheduleExecutor(executor);
driver = createMock(Driver.class);
+ shutdownCommand = createMock(Command.class);
eventBus = new EventBus();
Injector injector = Guice.createInjector(
@@ -186,6 +190,7 @@ public class JobUpdaterIT extends EasyMockTest {
bind(EventSink.class).toInstance(eventBus::post);
bind(LockManager.class).to(LockManagerImpl.class);
bind(UUIDGenerator.class).to(UUIDGeneratorImpl.class);
+ bind(Lifecycle.class).toInstance(new Lifecycle(shutdownCommand));
}
});
updater = injector.getInstance(JobUpdateController.class);
@@ -668,6 +673,42 @@ public class JobUpdaterIT extends EasyMockTest {
updater.pulse(IJobUpdateKey.build(new JobUpdateKey(JOB.newBuilder(), "invalid"))));
}
+ @Test(expected = IllegalStateException.class)
+ public void testShutdownOnFailedPulse() throws Exception {
+ // Missing kill expectation will trigger failure.
+ shutdownCommand.execute();
+ expectLastCall().andAnswer(() -> {
+ storage.write((NoResult.Quiet) storeProvider -> releaseAllLocks());
+ throw new IllegalStateException("Expected shutdown triggered.");
+ });
+
+ control.replay();
+
+ JobUpdate builder = makeJobUpdate(
+ // No-op - task is already matching the new config.
+ makeInstanceConfig(0, 0, NEW_CONFIG),
+ // Tasks needing update.
+ makeInstanceConfig(1, 2, OLD_CONFIG)).newBuilder();
+
+ builder.getInstructions().getSettings().setBlockIfNoPulsesAfterMs((int) PULSE_TIMEOUT_MS);
+ insertInitialTasks(IJobUpdate.build(builder));
+
+ changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+ changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+ changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
+ clock.advance(WATCH_TIMEOUT);
+
+ ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder();
+ updater.start(IJobUpdate.build(builder), AUDIT);
+
+ // The update is blocked initially waiting for a pulse.
+ assertState(ROLL_FORWARD_AWAITING_PULSE, actions.build());
+
+ // Pulse arrives and update starts.
+ assertEquals(JobUpdatePulseStatus.OK, updater.pulse(UPDATE_ID));
+ changeState(JOB, 1, KILLED, ASSIGNED, STARTING, RUNNING);
+ }
+
@Test
public void testSuccessfulBatchedUpdate() throws Exception {
expectTaskKilled().times(3);