You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/03/10 21:32:45 UTC

aurora git commit: Log and terminate scheduler on updater thread failure.

Repository: aurora
Updated Branches:
  refs/heads/master c5f94e05f -> 72bf8dbd2


Log and terminate scheduler on updater thread failure.

Bugs closed: AURORA-1630

Reviewed at https://reviews.apache.org/r/44493/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/72bf8dbd
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/72bf8dbd
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/72bf8dbd

Branch: refs/heads/master
Commit: 72bf8dbd20221806806e964870f6f592fb6713f8
Parents: c5f94e0
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Thu Mar 10 12:32:38 2016 -0800
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Thu Mar 10 12:32:38 2016 -0800

----------------------------------------------------------------------
 .../apache/aurora/scheduler/base/AsyncUtil.java | 31 ++++++++
 .../scheduler/pruning/TaskHistoryPruner.java    | 67 ++++++++--------
 .../updater/JobUpdateControllerImpl.java        | 82 ++++++++++++--------
 .../aurora/scheduler/updater/UpdaterModule.java |  9 ++-
 .../aurora/scheduler/updater/JobUpdaterIT.java  | 41 ++++++++++
 5 files changed, 159 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/72bf8dbd/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java b/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java
index 80dc35e..474b6e0 100644
--- a/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java
+++ b/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import org.apache.aurora.common.application.Lifecycle;
 import org.apache.aurora.common.stats.Stats;
 import org.slf4j.Logger;
 
@@ -112,6 +113,36 @@ public final class AsyncUtil {
         };
   }
 
+  /**
+   * Helper wrapper to call the provided {@link Lifecycle} on unhandled error.
+   *
+   * @param lifecycle {@link Lifecycle} instance.
+   * @param logger Logger instance.
+   * @param message message to log.
+   * @param runnable {@link Runnable} to wrap.
+   * @return A new {@link Runnable} logging an error and calling {@link Lifecycle#shutdown()}.
+   */
+  public static Runnable shutdownOnError(
+      Lifecycle lifecycle,
+      Logger logger,
+      String message,
+      Runnable runnable) {
+
+    requireNonNull(lifecycle);
+    requireNonNull(logger);
+    requireNonNull(message);
+    requireNonNull(runnable);
+
+    return () -> {
+      try {
+        runnable.run();
+      } catch (Throwable t) {
+        logger.error(message, t);
+        lifecycle.shutdown();
+      }
+    };
+  }
+
   private static void evaluateResult(Runnable runnable, Throwable throwable, Logger logger) {
     // See java.util.concurrent.ThreadPoolExecutor#afterExecute(Runnable, Throwable)
     // for more details and an implementation example.

http://git-wip-us.apache.org/repos/asf/aurora/blob/72bf8dbd/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
index 22753b4..f07746c 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.requireNonNull;
 
+import static org.apache.aurora.scheduler.base.AsyncUtil.shutdownOnError;
 import static org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 
@@ -52,6 +53,8 @@ import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
  */
 public class TaskHistoryPruner implements EventSubscriber {
   private static final Logger LOG = LoggerFactory.getLogger(TaskHistoryPruner.class);
+  private static final String FATAL_ERROR_FORMAT =
+      "Unexpected problem pruning task history for %s. Triggering shutdown";
 
   private final DelayExecutor executor;
   private final StateManager stateManager;
@@ -137,19 +140,6 @@ public class TaskHistoryPruner implements EventSubscriber {
     return Query.jobScoped(jobKey).byStatus(apiConstants.TERMINAL_STATES);
   }
 
-  private Runnable shutdownOnError(String subject, Runnable runnable) {
-    return () -> {
-      try {
-        runnable.run();
-      } catch (Throwable t) {
-        LOG.error(
-            "Unexpected problem pruning task history for " + subject + ". Triggering shutdown",
-            t);
-        lifecycle.shutdown();
-      }
-    };
-  }
-
   private void registerInactiveTask(
       final IJobKey jobKey,
       final String taskId,
@@ -158,28 +148,37 @@ public class TaskHistoryPruner implements EventSubscriber {
     LOG.debug("Prune task " + taskId + " in " + timeRemaining + " ms.");
 
     executor.execute(
-        shutdownOnError("task: " + taskId, () -> {
-          LOG.info("Pruning expired inactive task " + taskId);
-          deleteTasks(ImmutableSet.of(taskId));
-        }),
+        shutdownOnError(
+            lifecycle,
+            LOG,
+            String.format(FATAL_ERROR_FORMAT, "task: " + taskId),
+            () -> {
+              LOG.info("Pruning expired inactive task " + taskId);
+              deleteTasks(ImmutableSet.of(taskId));
+            }),
         Amount.of(timeRemaining, Time.MILLISECONDS));
 
-    executor.execute(shutdownOnError("job: " + jobKey, () -> {
-      Iterable<IScheduledTask> inactiveTasks =
-          Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey));
-      int numInactiveTasks = Iterables.size(inactiveTasks);
-      int tasksToPrune = numInactiveTasks - settings.perJobHistoryGoal;
-      if (tasksToPrune > 0 && numInactiveTasks > settings.perJobHistoryGoal) {
-        Set<String> toPrune = FluentIterable
-            .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks))
-            .filter(safeToDelete)
-            .limit(tasksToPrune)
-            .transform(Tasks::id)
-            .toSet();
-        if (!toPrune.isEmpty()) {
-          deleteTasks(toPrune);
-        }
-      }
-    }));
+    executor.execute(
+        shutdownOnError(
+            lifecycle,
+            LOG,
+            String.format(FATAL_ERROR_FORMAT, "job: " + jobKey),
+            () -> {
+              Iterable<IScheduledTask> inactiveTasks =
+                  Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey));
+              int numInactiveTasks = Iterables.size(inactiveTasks);
+              int tasksToPrune = numInactiveTasks - settings.perJobHistoryGoal;
+              if (tasksToPrune > 0 && numInactiveTasks > settings.perJobHistoryGoal) {
+                Set<String> toPrune = FluentIterable
+                    .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks))
+                    .filter(safeToDelete)
+                    .limit(tasksToPrune)
+                    .transform(Tasks::id)
+                    .toSet();
+                if (!toPrune.isEmpty()) {
+                  deleteTasks(toPrune);
+                }
+              }
+            }));
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/72bf8dbd/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index 48d7e2a..364c5c7 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -31,6 +31,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.inject.Inject;
 
+import org.apache.aurora.common.application.Lifecycle;
 import org.apache.aurora.common.collections.Pair;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
@@ -81,6 +82,7 @@ import static org.apache.aurora.gen.JobUpdateStatus.ROLLED_FORWARD;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_BACK;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLLING_FORWARD;
 import static org.apache.aurora.gen.JobUpdateStatus.ROLL_FORWARD_AWAITING_PULSE;
+import static org.apache.aurora.scheduler.base.AsyncUtil.shutdownOnError;
 import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.ACTIVE_QUERY;
 import static org.apache.aurora.scheduler.updater.JobUpdateStateMachine.AUTO_RESUME_STATES;
@@ -107,6 +109,8 @@ import static org.apache.aurora.scheduler.updater.SideEffect.InstanceUpdateStatu
  */
 class JobUpdateControllerImpl implements JobUpdateController {
   private static final Logger LOG = LoggerFactory.getLogger(JobUpdateControllerImpl.class);
+  private static final String FATAL_ERROR_FORMAT =
+      "Unexpected problem running asynchronous updater for: %s. Triggering shutdown";
 
   private final UpdateFactory updateFactory;
   private final LockManager lockManager;
@@ -115,6 +119,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
   private final StateManager stateManager;
   private final Clock clock;
   private final PulseHandler pulseHandler;
+  private final Lifecycle lifecycle;
 
   // Currently-active updaters. An active updater is one that is rolling forward or back. Paused
   // and completed updates are represented only in storage, not here.
@@ -128,7 +133,8 @@ class JobUpdateControllerImpl implements JobUpdateController {
       Storage storage,
       ScheduledExecutorService executor,
       StateManager stateManager,
-      Clock clock) {
+      Clock clock,
+      Lifecycle lifecycle) {
 
     this.updateFactory = requireNonNull(updateFactory);
     this.lockManager = requireNonNull(lockManager);
@@ -136,6 +142,7 @@ class JobUpdateControllerImpl implements JobUpdateController {
     this.executor = requireNonNull(executor);
     this.stateManager = requireNonNull(stateManager);
     this.clock = requireNonNull(clock);
+    this.lifecycle = requireNonNull(lifecycle);
     this.pulseHandler = new PulseHandler(clock);
   }
 
@@ -289,15 +296,19 @@ class JobUpdateControllerImpl implements JobUpdateController {
 
     if (JobUpdateStateMachine.isAwaitingPulse(state.getStatus())) {
       // Attempt to unblock a job update previously blocked on expired pulse.
-      executor.execute(() -> {
-        try {
-          unscopedChangeUpdateStatus(
-              key,
-              status -> new JobUpdateEvent().setStatus(GET_UNBLOCKED_STATE.apply(status)));
-        } catch (UpdateStateException e) {
-          LOG.error("Error while processing job update pulse: " + e);
-        }
-      });
+      executor.execute(shutdownOnError(
+          lifecycle,
+          LOG,
+          String.format(FATAL_ERROR_FORMAT, key),
+          () -> {
+            try {
+              unscopedChangeUpdateStatus(
+                  key,
+                  status -> new JobUpdateEvent().setStatus(GET_UNBLOCKED_STATE.apply(status)));
+            } catch (UpdateStateException e) {
+              LOG.error(String.format("Error processing job update pulse for %s: %s", key, e));
+            }
+          }));
     }
 
     return JobUpdatePulseStatus.OK;
@@ -694,29 +705,34 @@ class JobUpdateControllerImpl implements JobUpdateController {
   }
 
   private Runnable getDeferredEvaluator(final IInstanceKey instance, final IJobUpdateKey key) {
-    return () -> storage.write((NoResult.Quiet) storeProvider -> {
-      IJobUpdateSummary summary =
-          getOnlyMatch(storeProvider.getJobUpdateStore(), queryByUpdate(key));
-      JobUpdateStatus status = summary.getState().getStatus();
-      // Suppress this evaluation if the updater is not currently active.
-      if (JobUpdateStateMachine.isActive(status)) {
-        UpdateFactory.Update update = updates.get(instance.getJobKey());
-        try {
-          evaluateUpdater(
-              storeProvider,
-              update,
-              summary,
-              ImmutableMap.of(
-                  instance.getInstanceId(),
-                  getActiveInstance(
-                      storeProvider.getTaskStore(),
-                      instance.getJobKey(),
-                      instance.getInstanceId())));
-        } catch (UpdateStateException e) {
-          throw Throwables.propagate(e);
-        }
-      }
-    });
+    return shutdownOnError(
+        lifecycle,
+        LOG,
+        String.format(FATAL_ERROR_FORMAT, "Key: " + key + " Instance key: " + instance),
+        () -> storage.write((NoResult.Quiet) storeProvider -> {
+          IJobUpdateSummary summary =
+              getOnlyMatch(storeProvider.getJobUpdateStore(), queryByUpdate(key));
+          JobUpdateStatus status = summary.getState().getStatus();
+          // Suppress this evaluation if the updater is not currently active.
+          if (JobUpdateStateMachine.isActive(status)) {
+            UpdateFactory.Update update = updates.get(instance.getJobKey());
+            try {
+              evaluateUpdater(
+                  storeProvider,
+                  update,
+                  summary,
+                  ImmutableMap.of(
+                      instance.getInstanceId(),
+                      getActiveInstance(
+                          storeProvider.getTaskStore(),
+                          instance.getJobKey(),
+                          instance.getInstanceId())));
+            } catch (UpdateStateException e) {
+              LOG.error(String.format("Error running deferred evaluation for %s: %s", instance, e));
+              Throwables.propagate(e);
+            }
+          }
+        }));
   }
 
   private static class PulseHandler {

http://git-wip-us.apache.org/repos/asf/aurora/blob/72bf8dbd/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java b/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
index c0472d7..13cbdad 100644
--- a/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/updater/UpdaterModule.java
@@ -14,29 +14,30 @@
 package org.apache.aurora.scheduler.updater;
 
 import java.util.Objects;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
 import javax.inject.Singleton;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.AbstractModule;
 import com.google.inject.PrivateModule;
 
 import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Binding module for scheduling logic and higher-level state management.
  */
 public class UpdaterModule extends AbstractModule {
+  private static final Logger LOG = LoggerFactory.getLogger(UpdaterModule.class);
 
   private final ScheduledExecutorService executor;
 
   public UpdaterModule() {
-    this(Executors.newSingleThreadScheduledExecutor(
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("updater-%d").build()));
+    this(AsyncUtil.singleThreadLoggingScheduledExecutor("updater-%d", LOG));
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/aurora/blob/72bf8dbd/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index b39e388..cc88915 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -34,6 +34,8 @@ import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 
+import org.apache.aurora.common.application.Lifecycle;
+import org.apache.aurora.common.base.Command;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.stats.Stats;
@@ -149,6 +151,7 @@ public class JobUpdaterIT extends EasyMockTest {
   private LockManager lockManager;
   private StateManager stateManager;
   private JobUpdateEventSubscriber subscriber;
+  private Command shutdownCommand;
 
   private static ITaskConfig setExecutorData(ITaskConfig task, String executorData) {
     TaskConfig builder = task.newBuilder();
@@ -163,6 +166,7 @@ public class JobUpdaterIT extends EasyMockTest {
     ScheduledExecutorService executor = createMock(ScheduledExecutorService.class);
     clock = FakeScheduledExecutor.scheduleExecutor(executor);
     driver = createMock(Driver.class);
+    shutdownCommand = createMock(Command.class);
     eventBus = new EventBus();
 
     Injector injector = Guice.createInjector(
@@ -186,6 +190,7 @@ public class JobUpdaterIT extends EasyMockTest {
             bind(EventSink.class).toInstance(eventBus::post);
             bind(LockManager.class).to(LockManagerImpl.class);
             bind(UUIDGenerator.class).to(UUIDGeneratorImpl.class);
+            bind(Lifecycle.class).toInstance(new Lifecycle(shutdownCommand));
           }
         });
     updater = injector.getInstance(JobUpdateController.class);
@@ -668,6 +673,42 @@ public class JobUpdaterIT extends EasyMockTest {
         updater.pulse(IJobUpdateKey.build(new JobUpdateKey(JOB.newBuilder(), "invalid"))));
   }
 
+  @Test(expected = IllegalStateException.class)
+  public void testShutdownOnFailedPulse() throws Exception {
+    // Missing kill expectation will trigger failure.
+    shutdownCommand.execute();
+    expectLastCall().andAnswer(() -> {
+      storage.write((NoResult.Quiet) storeProvider -> releaseAllLocks());
+      throw new IllegalStateException("Expected shutdown triggered.");
+    });
+
+    control.replay();
+
+    JobUpdate builder = makeJobUpdate(
+        // No-op - task is already matching the new config.
+        makeInstanceConfig(0, 0, NEW_CONFIG),
+        // Tasks needing update.
+        makeInstanceConfig(1, 2, OLD_CONFIG)).newBuilder();
+
+    builder.getInstructions().getSettings().setBlockIfNoPulsesAfterMs((int) PULSE_TIMEOUT_MS);
+    insertInitialTasks(IJobUpdate.build(builder));
+
+    changeState(JOB, 0, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 1, ASSIGNED, STARTING, RUNNING);
+    changeState(JOB, 2, ASSIGNED, STARTING, RUNNING);
+    clock.advance(WATCH_TIMEOUT);
+
+    ImmutableMultimap.Builder<Integer, JobUpdateAction> actions = ImmutableMultimap.builder();
+    updater.start(IJobUpdate.build(builder), AUDIT);
+
+    // The update is blocked initially waiting for a pulse.
+    assertState(ROLL_FORWARD_AWAITING_PULSE, actions.build());
+
+    // Pulse arrives and update starts.
+    assertEquals(JobUpdatePulseStatus.OK, updater.pulse(UPDATE_ID));
+    changeState(JOB, 1, KILLED, ASSIGNED, STARTING, RUNNING);
+  }
+
   @Test
   public void testSuccessfulBatchedUpdate() throws Exception {
     expectTaskKilled().times(3);