You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2022/03/04 12:59:56 UTC

[accumulo] branch main updated: Check scheduled tasks periodically for errors (#2524)

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 7fb1b5d  Check scheduled tasks periodically for errors (#2524)
7fb1b5d is described below

commit 7fb1b5df9e3abe18304039fb3e781b2617eec4b5
Author: Dave Marion <dl...@apache.org>
AuthorDate: Fri Mar 4 07:59:03 2022 -0500

    Check scheduled tasks periodically for errors (#2524)
    
    Updated code that use ScheduleExecutorServices and were not retaining a reference to the ScheduleFuture object returned by the methods that create scheduled tasks. If the code was adding a non-scheduled task via the `submit` method, I changed the code to use the `execute` method. For code that was adding a scheduled task, but not doing anything with the ScheduledFuture, I modified the code to pass the ScheduledFuture reference to a utility method in ThreadPools. The new ThreadPools ut [...]
---
 .../DefaultContextClassLoaderFactory.java          |   4 +-
 .../core/clientImpl/ConditionalWriterImpl.java     |   7 +-
 .../core/clientImpl/TabletServerBatchWriter.java   |  36 +++---
 .../file/blockfile/cache/lru/LruBlockCache.java    |   6 +-
 .../blockfile/cache/tinylfu/TinyLfuBlockCache.java |   5 +-
 .../org/apache/accumulo/core/summary/Gatherer.java |  10 +-
 .../util/ratelimit/SharedRateLimiterFactory.java   |  10 +-
 .../accumulo/core/util/threads/ThreadPools.java    | 124 +++++++++++++++++++++
 .../main/java/org/apache/accumulo/fate/Fate.java   |   4 +-
 .../core/file/rfile/MultiThreadedRFileTest.java    |   2 +-
 pom.xml                                            |   3 +-
 .../org/apache/accumulo/server/ServerContext.java  |   4 +-
 .../accumulo/server/client/BulkImporter.java       |   6 +-
 .../server/compaction/CompactionWatcher.java       |   5 +-
 .../org/apache/accumulo/server/fs/FileManager.java |   6 +-
 .../accumulo/server/manager/LiveTServerSet.java    |   6 +-
 .../apache/accumulo/server/rpc/TServerUtils.java   |  30 ++---
 .../accumulo/server/util/FileSystemMonitor.java    |  26 ++---
 .../server/util/RemoveEntriesForMissingFiles.java  |   2 +-
 .../server/zookeeper/DistributedWorkQueue.java     |  29 ++---
 .../coordinator/CompactionCoordinator.java         |  11 +-
 .../accumulo/coordinator/CompactionFinalizer.java  |   4 +-
 .../coordinator/DeadCompactionDetector.java        |   5 +-
 .../org/apache/accumulo/compactor/Compactor.java   |  11 +-
 .../java/org/apache/accumulo/manager/Manager.java  |  13 ++-
 .../org/apache/accumulo/manager/ManagerTime.java   |   6 +-
 .../manager/metrics/ReplicationMetrics.java        |   6 +-
 .../accumulo/manager/metrics/fate/FateMetrics.java |   5 +-
 .../accumulo/manager/recovery/RecoveryManager.java |   9 +-
 .../util/logging/AccumuloMonitorAppender.java      |   6 +-
 .../apache/accumulo/tserver/AssignmentHandler.java |  33 +++---
 .../org/apache/accumulo/tserver/TabletServer.java  |  77 +++++++------
 .../tserver/TabletServerResourceManager.java       |  11 +-
 .../accumulo/tserver/log/TabletServerLogger.java   |   2 +-
 .../metrics/CompactionExecutorsMetrics.java        |   7 +-
 .../accumulo/tserver/session/SessionManager.java   |  10 +-
 .../apache/accumulo/test/ConditionalWriterIT.java  |   2 +-
 .../compaction/ExternalDoNothingCompactor.java     |   5 +-
 .../apache/accumulo/test/functional/ScanIdIT.java  |   2 +-
 .../accumulo/test/manager/SuspendedTabletsIT.java  |   2 +-
 .../test/performance/scan/CollectTabletStats.java  |   4 +-
 41 files changed, 383 insertions(+), 173 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java b/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java
index e19c38b..6baaa64 100644
--- a/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java
@@ -22,6 +22,7 @@ import static java.util.concurrent.TimeUnit.MINUTES;
 
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -62,7 +63,7 @@ public class DefaultContextClassLoaderFactory implements ContextClassLoaderFacto
 
   private static void startCleanupThread(final AccumuloConfiguration conf,
       final Supplier<Map<String,String>> contextConfigSupplier) {
-    ThreadPools.createGeneralScheduledExecutorService(conf)
+    ScheduledFuture<?> future = ThreadPools.createGeneralScheduledExecutorService(conf)
         .scheduleWithFixedDelay(Threads.createNamedRunnable(className + "-cleanup", () -> {
           LOG.trace("{}-cleanup thread, properties: {}", className, conf);
           Set<String> contextsInUse = contextConfigSupplier.get().keySet().stream()
@@ -71,6 +72,7 @@ public class DefaultContextClassLoaderFactory implements ContextClassLoaderFacto
           LOG.trace("{}-cleanup thread, contexts in use: {}", className, contextsInUse);
           AccumuloVFSClassLoader.removeUnusedContexts(contextsInUse);
         }), 1, 1, MINUTES);
+    ThreadPools.watchNonCriticalScheduledTask(future);
     LOG.debug("Context cleanup timer started at 60s intervals");
   }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
index ff0d596..d566bae 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
@@ -38,6 +38,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -113,6 +114,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
   private Map<String,ServerQueue> serverQueues;
   private DelayQueue<QCMutation> failedMutations = new DelayQueue<>();
   private ScheduledThreadPoolExecutor threadPool;
+  private final ScheduledFuture<?> failureTaskFuture;
 
   private class RQIterator implements Iterator<Result> {
 
@@ -380,12 +382,15 @@ class ConditionalWriterImpl implements ConditionalWriter {
         queue(mutations);
     };
 
-    threadPool.scheduleAtFixedRate(failureHandler, 250, 250, MILLISECONDS);
+    failureTaskFuture = threadPool.scheduleAtFixedRate(failureHandler, 250, 250, MILLISECONDS);
   }
 
   @Override
   public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
 
+    ThreadPools.ensureRunning(failureTaskFuture,
+        "Background task that re-queues failed mutations has exited.");
+
     BlockingQueue<Result> resultQueue = new LinkedBlockingQueue<>();
 
     List<QCMutation> mutationList = new ArrayList<>();
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
index 9047622..b2374bc 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
@@ -37,6 +37,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -126,6 +127,7 @@ public class TabletServerBatchWriter implements AutoCloseable {
 
   // latency timers
   private final ScheduledThreadPoolExecutor executor;
+  private ScheduledFuture<?> latencyTimerFuture;
   private final Map<String,TimeoutTracker> timeoutTrackers =
       Collections.synchronizedMap(new HashMap<>());
 
@@ -214,17 +216,18 @@ public class TabletServerBatchWriter implements AutoCloseable {
     this.writer = new MutationWriter(config.getMaxWriteThreads());
 
     if (this.maxLatency != Long.MAX_VALUE) {
-      executor.scheduleWithFixedDelay(Threads.createNamedRunnable("BatchWriterLatencyTimer", () -> {
-        try {
-          synchronized (TabletServerBatchWriter.this) {
-            if ((System.currentTimeMillis() - lastProcessingStartTime)
-                > TabletServerBatchWriter.this.maxLatency)
-              startProcessing();
-          }
-        } catch (Exception e) {
-          updateUnknownErrors("Max latency task failed " + e.getMessage(), e);
-        }
-      }), 0, this.maxLatency / 4, MILLISECONDS);
+      latencyTimerFuture = executor
+          .scheduleWithFixedDelay(Threads.createNamedRunnable("BatchWriterLatencyTimer", () -> {
+            try {
+              synchronized (TabletServerBatchWriter.this) {
+                if ((System.currentTimeMillis() - lastProcessingStartTime)
+                    > TabletServerBatchWriter.this.maxLatency)
+                  startProcessing();
+              }
+            } catch (Exception e) {
+              updateUnknownErrors("Max latency task failed " + e.getMessage(), e);
+            }
+          }), 0, this.maxLatency / 4, MILLISECONDS);
     }
   }
 
@@ -248,6 +251,10 @@ public class TabletServerBatchWriter implements AutoCloseable {
       throw new IllegalStateException("Closed");
     if (m.size() == 0)
       throw new IllegalArgumentException("Can not add empty mutations");
+    if (this.latencyTimerFuture != null) {
+      ThreadPools.ensureRunning(this.latencyTimerFuture,
+          "Latency timer thread has exited, cannot guarantee latency target");
+    }
 
     checkForFailures();
 
@@ -576,14 +583,17 @@ public class TabletServerBatchWriter implements AutoCloseable {
     private MutationSet recentFailures = null;
     private long initTime;
     private final Runnable task;
+    private final ScheduledFuture<?> future;
 
     FailedMutations() {
       task =
           Threads.createNamedRunnable("failed mutationBatchWriterLatencyTimers handler", this::run);
-      executor.scheduleWithFixedDelay(task, 0, 500, MILLISECONDS);
+      future = executor.scheduleWithFixedDelay(task, 0, 500, MILLISECONDS);
     }
 
     private MutationSet init() {
+      ThreadPools.ensureRunning(future,
+          "Background task that re-queues failed mutations has exited.");
       if (recentFailures == null) {
         recentFailures = new MutationSet();
         initTime = System.currentTimeMillis();
@@ -775,7 +785,7 @@ public class TabletServerBatchWriter implements AutoCloseable {
 
       for (String server : servers)
         if (!queued.contains(server)) {
-          sendThreadPool.submit(new SendTask(server));
+          sendThreadPool.execute(new SendTask(server));
           queued.add(server);
         }
     }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
index b66fa36..ad426b1 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
@@ -28,6 +28,7 @@ import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
@@ -160,8 +161,9 @@ public class LruBlockCache extends SynchronousLoadingBlockCache implements Block
     } else {
       this.evictionThread = null;
     }
-    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod,
-        statThreadPeriod, SECONDS);
+    ScheduledFuture<?> future = this.scheduleThreadPool.scheduleAtFixedRate(
+        new StatisticsThread(this), statThreadPeriod, statThreadPeriod, SECONDS);
+    ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
   public long getOverhead() {
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
index 6553319..9819923 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.function.Supplier;
 
 import org.apache.accumulo.core.file.blockfile.cache.impl.ClassSize;
@@ -71,7 +72,9 @@ public final class TinyLfuBlockCache implements BlockCache {
         }).maximumWeight(conf.getMaxSize(type)).recordStats().build();
     policy = cache.policy().eviction().get();
     maxSize = (int) Math.min(Integer.MAX_VALUE, policy.getMaximum());
-    statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC, STATS_PERIOD_SEC, SECONDS);
+    ScheduledFuture<?> future = statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC,
+        STATS_PERIOD_SEC, SECONDS);
+    ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
index 17b7a52..6a0938f 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
@@ -396,7 +396,10 @@ public class Gatherer {
 
         // when all processing is done, check for failed files... and if found starting processing
         // again
-        future.thenRun(this::updateFuture);
+        @SuppressWarnings("unused")
+        CompletableFuture<Void> unused = future.thenRun(() -> {
+          CompletableFuture<ProcessedFiles> unused2 = this.updateFuture();
+        });
       } catch (Exception e) {
         future = CompletableFuture.completedFuture(new ProcessedFiles());
         // force future to have this exception
@@ -449,7 +452,8 @@ public class Gatherer {
 
     @Override
     public synchronized boolean isDone() {
-      updateFuture();
+      @SuppressWarnings("unused")
+      CompletableFuture<ProcessedFiles> unused = updateFuture();
       if (future.isDone()) {
         if (future.isCancelled() || future.isCompletedExceptionally()) {
           return true;
@@ -459,7 +463,7 @@ public class Gatherer {
         if (pf.failedFiles.isEmpty()) {
           return true;
         } else {
-          updateFuture();
+          unused = updateFuture();
         }
       }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
index f31d26b..ae61337 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
@@ -25,6 +25,7 @@ import java.lang.ref.WeakReference;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.WeakHashMap;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
@@ -43,6 +44,7 @@ public class SharedRateLimiterFactory {
   private static final long REPORT_RATE = 60000;
   private static final long UPDATE_RATE = 1000;
   private static SharedRateLimiterFactory instance = null;
+  private static ScheduledFuture<?> updateTaskFuture;
   private final Logger log = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
   private final WeakHashMap<String,WeakReference<SharedRateLimiter>> activeLimiters =
       new WeakHashMap<>();
@@ -55,13 +57,14 @@ public class SharedRateLimiterFactory {
       instance = new SharedRateLimiterFactory();
 
       ScheduledThreadPoolExecutor svc = ThreadPools.createGeneralScheduledExecutorService(conf);
-      svc.scheduleWithFixedDelay(Threads
+      updateTaskFuture = svc.scheduleWithFixedDelay(Threads
           .createNamedRunnable("SharedRateLimiterFactory update polling", instance::updateAll),
           UPDATE_RATE, UPDATE_RATE, MILLISECONDS);
 
-      svc.scheduleWithFixedDelay(Threads
+      ScheduledFuture<?> future = svc.scheduleWithFixedDelay(Threads
           .createNamedRunnable("SharedRateLimiterFactory report polling", instance::reportAll),
           REPORT_RATE, REPORT_RATE, MILLISECONDS);
+      ThreadPools.watchNonCriticalScheduledTask(future);
 
     }
     return instance;
@@ -91,6 +94,9 @@ public class SharedRateLimiterFactory {
    */
   public RateLimiter create(String name, RateProvider rateProvider) {
     synchronized (activeLimiters) {
+      if (updateTaskFuture.isDone()) {
+        log.warn("SharedRateLimiterFactory update task has failed.");
+      }
       var limiterRef = activeLimiters.get(name);
       var limiter = limiterRef == null ? null : limiterRef.get();
       if (limiter == null) {
diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 67d08ba..afa05c0 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@ -21,9 +21,14 @@ package org.apache.accumulo.core.util.threads;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
+import java.util.Iterator;
+import java.util.List;
 import java.util.OptionalInt;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledFuture;
@@ -40,13 +45,132 @@ import org.apache.accumulo.core.trace.TraceUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN",
+    justification = "Throwing Error for it to be caught by AccumuloUncaughtExceptionHandler")
 public class ThreadPools {
 
+  public static class ExecutionError extends Error {
+
+    private static final long serialVersionUID = 1L;
+
+    public ExecutionError(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
 
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> NON_CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static Runnable TASK_CHECKER = new Runnable() {
+    @Override
+    public void run() {
+      final List<ConcurrentLinkedQueue<ScheduledFuture<?>>> queues =
+          List.of(CRITICAL_RUNNING_TASKS, NON_CRITICAL_RUNNING_TASKS);
+      while (true) {
+        queues.forEach(q -> {
+          Iterator<ScheduledFuture<?>> tasks = q.iterator();
+          while (tasks.hasNext()) {
+            if (checkTaskFailed(tasks.next(), q)) {
+              tasks.remove();
+            }
+          }
+        });
+        try {
+          TimeUnit.MINUTES.sleep(1);
+        } catch (InterruptedException ie) {
+          // This thread was interrupted by something while sleeping. We don't want to exit
+          // this thread, so reset the interrupt state on this thread and keep going.
+          Thread.interrupted();
+        }
+      }
+    }
+  };
+
+  /**
+   * Checks to see if a ScheduledFuture has exited successfully or thrown an error
+   *
+   * @param future
+   *          scheduled future to check
+   * @param taskQueue
+   *          the running task queue from which the future came
+   * @return true if the future should be removed
+   */
+  private static boolean checkTaskFailed(ScheduledFuture<?> future,
+      ConcurrentLinkedQueue<ScheduledFuture<?>> taskQueue) {
+    // Calling get() on a ScheduledFuture will block unless that scheduled task has
+    // completed. We call isDone() here instead. If the scheduled task is done then
+    // either it was a one-shot task, cancelled or an exception was thrown.
+    if (future.isDone()) {
+      // Now call get() to see if we get an exception.
+      try {
+        future.get();
+        // If we get here, then a scheduled task exited but did not throw an error
+        // or get canceled. This was likely a one-shot scheduled task (I don't think
+        // we can tell if it's one-shot or not, I think we have to assume that it is
+        // and that a recurring task would not normally be complete).
+        return true;
+      } catch (ExecutionException ee) {
+        // An exception was thrown in the critical task. Throw the error here, which
+        // will then be caught by the AccumuloUncaughtExceptionHandler which will
+        // log the error and terminate the VM.
+        if (taskQueue == CRITICAL_RUNNING_TASKS) {
+          throw new ExecutionError("Critical scheduled background task failed.", ee);
+        } else {
+          LOG.error("Non-critical scheduled background task failed", ee);
+          return true;
+        }
+      } catch (CancellationException ce) {
+        // do nothing here as it appears that the task was canceled. Remove it from
+        // the list of critical tasks
+        return true;
+      } catch (InterruptedException ie) {
+        // current thread was interrupted waiting for get to return, which in theory,
+        // shouldn't happen since the task is done.
+        LOG.info("Interrupted while waiting to check on scheduled background task.");
+        // Reset the interrupt state on this thread
+        Thread.interrupted();
+      }
+    }
+    return false;
+  }
+
+  static {
+    SCHEDULED_FUTURE_CHECKER_POOL.execute(TASK_CHECKER);
+  }
+
+  public static void watchCriticalScheduledTask(ScheduledFuture<?> future) {
+    CRITICAL_RUNNING_TASKS.add(future);
+  }
+
+  public static void watchNonCriticalScheduledTask(ScheduledFuture<?> future) {
+    NON_CRITICAL_RUNNING_TASKS.add(future);
+  }
+
+  public static void ensureRunning(ScheduledFuture<?> future, String message) {
+    if (future.isDone()) {
+      try {
+        future.get();
+      } catch (Exception e) {
+        throw new IllegalStateException(message, e);
+      }
+      // it exited w/o exception, but we still expect it to be running so throw an exception.
+      throw new IllegalStateException(message);
+    }
+  }
+
   /**
    * Resize ThreadPoolExecutor based on current value of maxThreads
    *
diff --git a/core/src/main/java/org/apache/accumulo/fate/Fate.java b/core/src/main/java/org/apache/accumulo/fate/Fate.java
index 0ca8a0a..d467dc3 100644
--- a/core/src/main/java/org/apache/accumulo/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/fate/Fate.java
@@ -240,7 +240,7 @@ public class Fate<T> {
     final ThreadPoolExecutor pool =
         ThreadPools.createExecutorService(conf, Property.MANAGER_FATE_THREADPOOL_SIZE, true);
     fatePoolWatcher = ThreadPools.createGeneralScheduledExecutorService(conf);
-    fatePoolWatcher.schedule(() -> {
+    ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.schedule(() -> {
       // resize the pool if the property changed
       ThreadPools.resizePool(pool, conf, Property.MANAGER_FATE_THREADPOOL_SIZE);
       // If the pool grew, then ensure that there is a TransactionRunner for each thread
@@ -262,7 +262,7 @@ public class Fate<T> {
           }
         }
       }
-    }, 3, SECONDS);
+    }, 3, SECONDS));
     executor = pool;
   }
 
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
index 0358c66..9ce5649 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
@@ -248,7 +248,7 @@ public class MultiThreadedRFileTest {
           }
         };
         for (int i = 0; i < maxThreads; i++) {
-          pool.submit(runnable);
+          pool.execute(runnable);
         }
       } finally {
         pool.shutdown();
diff --git a/pom.xml b/pom.xml
index 363e64e..a638932 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1674,7 +1674,7 @@
                 <arg>-XDcompilePolicy=simple</arg>
                 <arg>
                   -Xplugin:ErrorProne \
-                  -XepExcludedPaths:.*/(proto|thrift|generated-sources)/.* \
+                  -XepExcludedPaths:.*/(proto|thrift|generated-sources|src/test)/.* \
                   -XepDisableWarningsInGeneratedCode \
                   -XepDisableAllWarnings \
                   <!-- error/warning patterns to ignore -->
@@ -1682,6 +1682,7 @@
                   -Xep:CheckReturnValue:OFF \
                   -Xep:MustBeClosedChecker:OFF \
                   -Xep:ReturnValueIgnored:OFF \
+                  -Xep:FutureReturnValueIgnored:ERROR \
                   -Xep:UnicodeInCode:OFF \
                   <!-- error/warning patterns to specifically check -->
                   -Xep:ExpectedExceptionChecker \
diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
index aa59057..5d2bafd 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -423,7 +424,7 @@ public class ServerContext extends ClientContext {
   }
 
   private void monitorSwappiness() {
-    getScheduledExecutor().scheduleWithFixedDelay(() -> {
+    ScheduledFuture<?> future = getScheduledExecutor().scheduleWithFixedDelay(() -> {
       try {
         String procFile = "/proc/sys/vm/swappiness";
         File swappiness = new File(procFile);
@@ -445,6 +446,7 @@ public class ServerContext extends ClientContext {
         log.error("", t);
       }
     }, SECONDS.toMillis(1), MINUTES.toMillis(10), TimeUnit.MILLISECONDS);
+    ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
   /**
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
index a290c85..d68c5ea 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@ -153,7 +153,7 @@ public class BulkImporter {
               assignments.put(mapFile, tabletsToAssignMapFileTo);
           }
         };
-        threadPool.submit(getAssignments);
+        threadPool.execute(getAssignments);
       }
       threadPool.shutdown();
       while (!threadPool.isTerminated()) {
@@ -396,7 +396,7 @@ public class BulkImporter {
         }
       };
 
-      threadPool.submit(estimationTask);
+      threadPool.execute(estimationTask);
     }
 
     threadPool.shutdown();
@@ -541,7 +541,7 @@ public class BulkImporter {
     for (Entry<String,Map<KeyExtent,List<PathSize>>> entry : assignmentsPerTabletServer
         .entrySet()) {
       String location = entry.getKey();
-      threadPool.submit(new AssignmentTask(assignmentFailures, location, entry.getValue()));
+      threadPool.execute(new AssignmentTask(assignmentFailures, location, entry.getValue()));
     }
 
     threadPool.shutdown();
diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionWatcher.java
index ba5e5fc..d5c987a 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionWatcher.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionWatcher.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.server.ServerContext;
 import org.slf4j.LoggerFactory;
 
@@ -122,8 +123,8 @@ public class CompactionWatcher implements Runnable {
 
   public static synchronized void startWatching(ServerContext context) {
     if (!watching) {
-      context.getScheduledExecutor().scheduleWithFixedDelay(
-          new CompactionWatcher(context.getConfiguration()), 10000, 10000, TimeUnit.MILLISECONDS);
+      ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
+          new CompactionWatcher(context.getConfiguration()), 10000, 10000, TimeUnit.MILLISECONDS));
       watching = true;
     }
   }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
index 4763f59..cd276e5 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
@@ -50,6 +50,7 @@ import org.apache.accumulo.core.iteratorsImpl.system.TimeSettingIterator;
 import org.apache.accumulo.core.metadata.TabletFile;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReportingIterator;
@@ -163,8 +164,9 @@ public class FileManager {
     this.reservedReaders = new HashMap<>();
 
     this.maxIdleTime = this.context.getConfiguration().getTimeInMillis(Property.TSERV_MAX_IDLE);
-    this.context.getScheduledExecutor().scheduleWithFixedDelay(new IdleFileCloser(), maxIdleTime,
-        maxIdleTime / 2, TimeUnit.MILLISECONDS);
+    ThreadPools.watchCriticalScheduledTask(
+        this.context.getScheduledExecutor().scheduleWithFixedDelay(new IdleFileCloser(),
+            maxIdleTime, maxIdleTime / 2, TimeUnit.MILLISECONDS));
 
     this.slowFilePermitMillis =
         this.context.getConfiguration().getTimeInMillis(Property.TSERV_SLOW_FILEPERMIT_MILLIS);
diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
index a88af20..fbfec53 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
@@ -45,6 +45,7 @@ import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.Halt;
 import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.core.util.ServerServices;
+import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.fate.zookeeper.ServiceLock;
 import org.apache.accumulo.fate.zookeeper.ZooCache;
 import org.apache.accumulo.fate.zookeeper.ZooCache.ZcStat;
@@ -254,8 +255,9 @@ public class LiveTServerSet implements Watcher {
 
   public synchronized void startListeningForTabletServerChanges() {
     scanServers();
-    this.context.getScheduledExecutor().scheduleWithFixedDelay(this::scanServers, 0, 5000,
-        TimeUnit.MILLISECONDS);
+
+    ThreadPools.watchCriticalScheduledTask(this.context.getScheduledExecutor()
+        .scheduleWithFixedDelay(this::scanServers, 0, 5000, TimeUnit.MILLISECONDS));
   }
 
   public synchronized void scanServers() {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index eeccf7e..80e48cf 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -314,20 +314,22 @@ public class TServerUtils {
     final ThreadPoolExecutor pool = ThreadPools.createFixedThreadPool(executorThreads,
         threadTimeOut, TimeUnit.MILLISECONDS, serverName + "-ClientPool", true);
     // periodically adjust the number of threads we need by checking how busy our threads are
-    ThreadPools.createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay(() -> {
-      // there is a minor race condition between sampling the current state of the thread pool and
-      // adjusting it
-      // however, this isn't really an issue, since it adjusts periodically anyway
-      if (pool.getCorePoolSize() <= pool.getActiveCount()) {
-        int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
-        ThreadPools.resizePool(pool, () -> larger, serverName + "-ClientPool");
-      } else {
-        if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
-          int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1);
-          ThreadPools.resizePool(pool, () -> smaller, serverName + "-ClientPool");
-        }
-      }
-    }, timeBetweenThreadChecks, timeBetweenThreadChecks, TimeUnit.MILLISECONDS);
+    ThreadPools.watchCriticalScheduledTask(
+        ThreadPools.createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay(() -> {
+          // there is a minor race condition between sampling the current state of the thread pool
+          // and
+          // adjusting it
+          // however, this isn't really an issue, since it adjusts periodically anyway
+          if (pool.getCorePoolSize() <= pool.getActiveCount()) {
+            int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
+            ThreadPools.resizePool(pool, () -> larger, serverName + "-ClientPool");
+          } else {
+            if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
+              int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1);
+              ThreadPools.resizePool(pool, () -> smaller, serverName + "-ClientPool");
+            }
+          }
+        }, timeBetweenThreadChecks, timeBetweenThreadChecks, TimeUnit.MILLISECONDS));
     return pool;
   }
 
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java b/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java
index 39ce84a..11d19cd 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java
@@ -117,20 +117,20 @@ public class FileSystemMonitor {
 
     // Create a task to check each mount periodically to see if its state has changed.
     for (Mount mount : mounts) {
-      ThreadPools.createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay(
-          Threads.createNamedRunnable(mount.mountPoint + "filesystem monitor", () -> {
-            try {
-              checkMount(mount);
-            } catch (final Exception e) {
-              Halt.halt(-42, new Runnable() {
-                @Override
-                public void run() {
-                  log.error("Exception while checking mount points, halting process", e);
+      ThreadPools.watchCriticalScheduledTask(
+          ThreadPools.createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay(
+              Threads.createNamedRunnable(mount.mountPoint + "filesystem monitor", () -> {
+                try {
+                  checkMount(mount);
+                } catch (final Exception e) {
+                  Halt.halt(-42, new Runnable() {
+                    @Override
+                    public void run() {
+                      log.error("Exception while checking mount points, halting process", e);
+                    }
+                  });
                 }
-              });
-            }
-          }), period, period, TimeUnit.MILLISECONDS);
-
+              }), period, period, TimeUnit.MILLISECONDS));
     }
 
   }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java b/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
index 14914f4..ba22df7 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
@@ -156,7 +156,7 @@ public class RemoveEntriesForMissingFiles {
         processing.add(map);
       }
 
-      threadPool.submit(
+      threadPool.execute(
           new CheckFileTask(cache, fs, missing, writer, key, map, processing, exceptionRef));
     }
 
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
index 3a424dc..cbbf81b 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
@@ -217,19 +218,20 @@ public class DistributedWorkQueue {
     lookForWork(processor, children);
 
     // Add a little jitter to avoid all the tservers slamming zookeeper at once
-    context.getScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
-      @Override
-      public void run() {
-        log.debug("Looking for work in {}", path);
-        try {
-          lookForWork(processor, zoo.getChildren(path));
-        } catch (KeeperException e) {
-          log.error("Failed to look for work", e);
-        } catch (InterruptedException e) {
-          log.info("Interrupted looking for work", e);
-        }
-      }
-    }, timerInitialDelay, timerPeriod, TimeUnit.MILLISECONDS);
+    ThreadPools.watchCriticalScheduledTask(
+        context.getScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
+          @Override
+          public void run() {
+            log.debug("Looking for work in {}", path);
+            try {
+              lookForWork(processor, zoo.getChildren(path));
+            } catch (KeeperException e) {
+              log.error("Failed to look for work", e);
+            } catch (InterruptedException e) {
+              log.info("Interrupted looking for work", e);
+            }
+          }
+        }, timerInitialDelay, timerPeriod, TimeUnit.MILLISECONDS));
   }
 
   /**
@@ -240,6 +242,7 @@ public class DistributedWorkQueue {
   }
 
   public void addWork(String workId, byte[] data) throws KeeperException, InterruptedException {
+
     if (workId.equalsIgnoreCase(LOCKS_NODE))
       throw new IllegalArgumentException("locks is reserved work id");
 
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index d534062..77a5c3b 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -28,6 +28,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -162,12 +163,16 @@ public class CompactionCoordinator extends AbstractServer
   }
 
   protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
-    schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0,
-        TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
+    ScheduledFuture<?> future =
+        schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0,
+            TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
+    ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
   protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) {
-    schedExecutor.scheduleWithFixedDelay(() -> cleanUpCompactors(), 0, 5, TimeUnit.MINUTES);
+    ScheduledFuture<?> future =
+        schedExecutor.scheduleWithFixedDelay(() -> cleanUpCompactors(), 0, 5, TimeUnit.MINUTES);
+    ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
   protected void printStartupMsg() {
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
index 424deb7..196ac2a 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
@@ -83,8 +83,8 @@ public class CompactionFinalizer {
       processPending();
     });
 
-    schedExecutor.scheduleWithFixedDelay(() -> notifyTservers(), 0, tserverCheckInterval,
-        TimeUnit.MILLISECONDS);
+    ThreadPools.watchCriticalScheduledTask(schedExecutor.scheduleWithFixedDelay(
+        () -> notifyTservers(), 0, tserverCheckInterval, TimeUnit.MILLISECONDS));
   }
 
   public void commitCompaction(ExternalCompactionId ecid, KeyExtent extent, long fileSize,
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
index e4ee3bd..5a9e526 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
@@ -34,6 +34,7 @@ import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
+import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.server.ServerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -138,12 +139,12 @@ public class DeadCompactionDetector {
     long interval = this.context.getConfiguration()
         .getTimeInMillis(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL);
 
-    schedExecutor.scheduleWithFixedDelay(() -> {
+    ThreadPools.watchCriticalScheduledTask(schedExecutor.scheduleWithFixedDelay(() -> {
       try {
         detectDeadCompactions();
       } catch (RuntimeException e) {
         log.warn("Failed to look for dead compactions", e);
       }
-    }, 0, interval, TimeUnit.MILLISECONDS);
+    }, 0, interval, TimeUnit.MILLISECONDS));
   }
 }
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 14b0545..d1cc457 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -196,14 +197,16 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac
   }
 
   protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
-    schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0,
-        TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
+    ScheduledFuture<?> future =
+        schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0,
+            TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
+    ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
   protected void startCancelChecker(ScheduledThreadPoolExecutor schedExecutor,
       long timeBetweenChecks) {
-    schedExecutor.scheduleWithFixedDelay(() -> checkIfCanceled(), 0, timeBetweenChecks,
-        TimeUnit.MILLISECONDS);
+    ThreadPools.watchCriticalScheduledTask(schedExecutor.scheduleWithFixedDelay(
+        () -> checkIfCanceled(), 0, timeBetweenChecks, TimeUnit.MILLISECONDS));
   }
 
   protected void checkIfCanceled() {
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 7071569..7f360de 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -252,11 +253,12 @@ public class Manager extends AbstractServer
     if (newState == ManagerState.STOP) {
       // Give the server a little time before shutdown so the client
       // thread requesting the stop can return
-      getContext().getScheduledExecutor().scheduleWithFixedDelay(() -> {
+      ScheduledFuture<?> future = getContext().getScheduledExecutor().scheduleWithFixedDelay(() -> {
         // This frees the main thread and will cause the manager to exit
         clientService.stop();
         Manager.this.nextEvent.event("stopped event loop");
       }, 100L, 1000L, TimeUnit.MILLISECONDS);
+      ThreadPools.watchNonCriticalScheduledTask(future);
     }
 
     if (oldState != newState && (newState == ManagerState.HAVE_LOCK)) {
@@ -930,7 +932,7 @@ public class Manager extends AbstractServer
         // unresponsive tservers.
         sleepUninterruptibly(Math.max(1, rpcTimeout / 120_000), TimeUnit.MILLISECONDS);
       }
-      tp.submit(() -> {
+      tp.execute(() -> {
         try {
           Thread t = Thread.currentThread();
           String oldName = t.getName();
@@ -1150,8 +1152,8 @@ public class Manager extends AbstractServer
       fate = new Fate<>(this, store, TraceRepo::toLogString);
       fate.startTransactionRunners(getConfiguration());
 
-      context.getScheduledExecutor().scheduleWithFixedDelay(store::ageOff, 63000, 63000,
-          TimeUnit.MILLISECONDS);
+      ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
+          .scheduleWithFixedDelay(store::ageOff, 63000, 63000, TimeUnit.MILLISECONDS));
     } catch (KeeperException | InterruptedException e) {
       throw new IllegalStateException("Exception setting up FaTE cleanup thread", e);
     }
@@ -1198,7 +1200,7 @@ public class Manager extends AbstractServer
 
     // if the replication name is ever set, then start replication services
     final AtomicReference<TServer> replServer = new AtomicReference<>();
-    context.getScheduledExecutor().scheduleWithFixedDelay(() -> {
+    ScheduledFuture<?> future = context.getScheduledExecutor().scheduleWithFixedDelay(() -> {
       try {
         @SuppressWarnings("deprecation")
         Property p = Property.REPLICATION_NAME;
@@ -1210,6 +1212,7 @@ public class Manager extends AbstractServer
         log.error("Error occurred starting replication services. ", e);
       }
     }, 0, 5000, TimeUnit.MILLISECONDS);
+    ThreadPools.watchNonCriticalScheduledTask(future);
 
     // checking stored user hashes if any of them uses an outdated algorithm
     security.validateStoredUserCreditentials();
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java
index 41077bd..237a207 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java
@@ -64,9 +64,9 @@ public class ManagerTime {
       throw new IOException("Error updating manager time", ex);
     }
 
-    ThreadPools.createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay(
-        Threads.createNamedRunnable("Manager time keeper", () -> run()), 0, SECONDS.toMillis(10),
-        MILLISECONDS);
+    ThreadPools.watchCriticalScheduledTask(ThreadPools.createGeneralScheduledExecutorService(conf)
+        .scheduleWithFixedDelay(Threads.createNamedRunnable("Manager time keeper", () -> run()), 0,
+            SECONDS.toMillis(10), MILLISECONDS));
   }
 
   /**
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ReplicationMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ReplicationMetrics.java
index 79eb5ac..c90337b 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ReplicationMetrics.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ReplicationMetrics.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -165,8 +166,9 @@ public class ReplicationMetrics implements MetricsProducer {
         ThreadPools.createScheduledExecutorService(1, "replicationMetricsPoller", false);
     Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
     long minimumRefreshDelay = TimeUnit.SECONDS.toMillis(5);
-    scheduler.scheduleAtFixedRate(this::update, minimumRefreshDelay, minimumRefreshDelay,
-        TimeUnit.MILLISECONDS);
+    ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(this::update, minimumRefreshDelay,
+        minimumRefreshDelay, TimeUnit.MILLISECONDS);
+    ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
 }
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
index 055092f..42e18db 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.manager.metrics.fate;
 
 import java.util.Map.Entry;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -155,14 +156,14 @@ public class FateMetrics implements MetricsProducer {
         ThreadPools.createScheduledExecutorService(1, "fateMetricsPoller", false);
     Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
 
-    scheduler.scheduleAtFixedRate(() -> {
+    ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
       try {
         update();
       } catch (Exception ex) {
         log.info("Failed to update fate metrics due to exception", ex);
       }
     }, refreshDelay, refreshDelay, TimeUnit.MILLISECONDS);
-
+    ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
 }
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
index 0503963..a1ba264 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.Constants;
@@ -105,7 +106,8 @@ public class RecoveryManager {
             manager.getVolumeManager(), new Path(source));
 
         if (time > 0) {
-          executor.schedule(this, time, TimeUnit.MILLISECONDS);
+          ScheduledFuture<?> future = executor.schedule(this, time, TimeUnit.MILLISECONDS);
+          ThreadPools.watchNonCriticalScheduledTask(future);
           rescheduled = true;
         } else {
           initiateSort(sortId, source, destination);
@@ -210,8 +212,9 @@ public class RecoveryManager {
             log.info("Starting recovery of {} (in : {}s), tablet {} holds a reference", filename,
                 (delay / 1000), extent);
 
-            executor.schedule(new LogSortTask(closer, filename, dest, sortId), delay,
-                TimeUnit.MILLISECONDS);
+            ScheduledFuture<?> future = executor.schedule(
+                new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS);
+            ThreadPools.watchNonCriticalScheduledTask(future);
             closeTasksQueued.add(sortId);
             recoveryDelay.put(sortId, delay);
           }
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java
index 368e2a6..f62bb8b 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java
@@ -26,8 +26,10 @@ import java.net.URI;
 import java.net.http.HttpClient;
 import java.net.http.HttpRequest;
 import java.net.http.HttpRequest.BodyPublishers;
+import java.net.http.HttpResponse;
 import java.net.http.HttpResponse.BodyHandlers;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
 import org.apache.accumulo.core.Constants;
@@ -122,7 +124,9 @@ public class AccumuloMonitorAppender extends AbstractAppender {
 
         var req = HttpRequest.newBuilder(uri).POST(BodyPublishers.ofString(jsonEvent, UTF_8))
             .setHeader("Content-Type", "application/json").build();
-        httpClient.sendAsync(req, BodyHandlers.discarding());
+        @SuppressWarnings("unused")
+        CompletableFuture<HttpResponse<Void>> future =
+            httpClient.sendAsync(req, BodyHandlers.discarding());
       } catch (final Exception e) {
         error("Unable to send HTTP in appender [" + getName() + "]", event, e);
       }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
index df7f5fd..ece1d9c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
@@ -33,6 +33,7 @@ import org.apache.accumulo.core.manager.thrift.TabletLoadState;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.server.manager.state.Assignment;
 import org.apache.accumulo.server.manager.state.TabletStateStore;
@@ -220,22 +221,24 @@ class AssignmentHandler implements Runnable {
       server.enqueueManagerMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
       long reschedule = Math.min((1L << Math.min(32, retryAttempt)) * 1000, MINUTES.toMillis(10));
       log.warn(String.format("rescheduling tablet load in %.2f seconds", reschedule / 1000.));
-      this.server.getContext().getScheduledExecutor().schedule(new Runnable() {
-        @Override
-        public void run() {
-          log.info("adding tablet {} back to the assignment pool (retry {})", extent, retryAttempt);
-          AssignmentHandler handler = new AssignmentHandler(server, extent, retryAttempt + 1);
-          if (extent.isMeta()) {
-            if (extent.isRootTablet()) {
-              Threads.createThread("Root tablet assignment retry", handler).start();
-            } else {
-              server.resourceManager.addMetaDataAssignment(extent, log, handler);
+      ThreadPools.watchCriticalScheduledTask(
+          this.server.getContext().getScheduledExecutor().schedule(new Runnable() {
+            @Override
+            public void run() {
+              log.info("adding tablet {} back to the assignment pool (retry {})", extent,
+                  retryAttempt);
+              AssignmentHandler handler = new AssignmentHandler(server, extent, retryAttempt + 1);
+              if (extent.isMeta()) {
+                if (extent.isRootTablet()) {
+                  Threads.createThread("Root tablet assignment retry", handler).start();
+                } else {
+                  server.resourceManager.addMetaDataAssignment(extent, log, handler);
+                }
+              } else {
+                server.resourceManager.addAssignment(extent, log, handler);
+              }
             }
-          } else {
-            server.resourceManager.addAssignment(extent, log, handler);
-          }
-        }
-      }, reschedule, TimeUnit.MILLISECONDS);
+          }, reschedule, TimeUnit.MILLISECONDS));
     }
   }
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index c4146c1..c53bb91 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -50,6 +50,7 @@ import java.util.UUID;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -260,7 +261,7 @@ public class TabletServer extends AbstractServer {
     // This thread will calculate and log out the busiest tablets based on ingest count and
     // query count every #{logBusiestTabletsDelay}
     if (numBusyTabletsToLog > 0) {
-      context.getScheduledExecutor()
+      ScheduledFuture<?> future = context.getScheduledExecutor()
           .scheduleWithFixedDelay(Threads.createNamedRunnable("BusyTabletLogger", new Runnable() {
             private BusiestTracker ingestTracker =
                 BusiestTracker.newBusiestIngestTracker(numBusyTabletsToLog);
@@ -285,9 +286,10 @@ public class TabletServer extends AbstractServer {
               }
             }
           }), logBusyTabletsDelay, logBusyTabletsDelay, TimeUnit.MILLISECONDS);
+      ThreadPools.watchNonCriticalScheduledTask(future);
     }
 
-    context.getScheduledExecutor()
+    ScheduledFuture<?> future = context.getScheduledExecutor()
         .scheduleWithFixedDelay(Threads.createNamedRunnable("TabletRateUpdater", new Runnable() {
           @Override
           public void run() {
@@ -301,6 +303,7 @@ public class TabletServer extends AbstractServer {
             }
           }
         }), 5, 5, TimeUnit.SECONDS);
+    ThreadPools.watchNonCriticalScheduledTask(future);
 
     @SuppressWarnings("deprecation")
     final long walMaxSize =
@@ -348,8 +351,8 @@ public class TabletServer extends AbstractServer {
     this.resourceManager = new TabletServerResourceManager(context);
     this.security = AuditedSecurityOperation.getInstance(context);
 
-    context.getScheduledExecutor().scheduleWithFixedDelay(TabletLocator::clearLocators, jitter(),
-        jitter(), TimeUnit.MILLISECONDS);
+    ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
+        TabletLocator::clearLocators, jitter(), jitter(), TimeUnit.MILLISECONDS));
     walMarker = new WalStateManager(context);
 
     // Create the secret manager
@@ -791,7 +794,7 @@ public class TabletServer extends AbstractServer {
     // if the replication name is ever set, then start replication services
     @SuppressWarnings("deprecation")
     Property p = Property.REPLICATION_NAME;
-    context.getScheduledExecutor().scheduleWithFixedDelay(() -> {
+    ScheduledFuture<?> future = context.getScheduledExecutor().scheduleWithFixedDelay(() -> {
       if (this.replServer == null) {
         if (!getConfiguration().get(p).isEmpty()) {
           log.info(p.getKey() + " was set, starting repl services.");
@@ -799,36 +802,40 @@ public class TabletServer extends AbstractServer {
         }
       }
     }, 0, 5, TimeUnit.SECONDS);
+    ThreadPools.watchNonCriticalScheduledTask(future);
 
     int tabletCheckFrequency = 30 + random.nextInt(31); // random 30-60 minute delay
     // Periodically check that metadata of tablets matches what is held in memory
-    ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(() -> {
-      final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = onlineTablets.snapshot();
-
-      Map<KeyExtent,Long> updateCounts = new HashMap<>();
-
-      // gather updateCounts for each tablet
-      onlineTabletsSnapshot.forEach((ke, tablet) -> {
-        updateCounts.put(ke, tablet.getUpdateCount());
-      });
-
-      // gather metadata for all tablets readTablets()
-      try (TabletsMetadata tabletsMetadata = getContext().getAmple().readTablets()
-          .forTablets(onlineTabletsSnapshot.keySet()).fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) {
-
-        // for each tablet, compare its metadata to what is held in memory
-        tabletsMetadata.forEach(tabletMetadata -> {
-          KeyExtent extent = tabletMetadata.getExtent();
-          Tablet tablet = onlineTabletsSnapshot.get(extent);
-          Long counter = updateCounts.get(extent);
-          tablet.compareTabletInfo(counter, tabletMetadata);
-        });
-      }
-    }, tabletCheckFrequency, tabletCheckFrequency, TimeUnit.MINUTES);
+    ThreadPools.watchCriticalScheduledTask(
+        ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(() -> {
+          final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = onlineTablets.snapshot();
+
+          Map<KeyExtent,Long> updateCounts = new HashMap<>();
+
+          // gather updateCounts for each tablet
+          onlineTabletsSnapshot.forEach((ke, tablet) -> {
+            updateCounts.put(ke, tablet.getUpdateCount());
+          });
+
+          // gather metadata for all tablets readTablets()
+          try (TabletsMetadata tabletsMetadata =
+              getContext().getAmple().readTablets().forTablets(onlineTabletsSnapshot.keySet())
+                  .fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) {
+
+            // for each tablet, compare its metadata to what is held in memory
+            tabletsMetadata.forEach(tabletMetadata -> {
+              KeyExtent extent = tabletMetadata.getExtent();
+              Tablet tablet = onlineTabletsSnapshot.get(extent);
+              Long counter = updateCounts.get(extent);
+              tablet.compareTabletInfo(counter, tabletMetadata);
+            });
+          }
+        }, tabletCheckFrequency, tabletCheckFrequency, TimeUnit.MINUTES));
 
     final long CLEANUP_BULK_LOADED_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(15);
-    context.getScheduledExecutor().scheduleWithFixedDelay(new BulkImportCacheCleaner(this),
-        CLEANUP_BULK_LOADED_CACHE_MILLIS, CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS);
+    ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
+        new BulkImportCacheCleaner(this), CLEANUP_BULK_LOADED_CACHE_MILLIS,
+        CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS));
 
     HostAndPort managerHost;
     while (!serverStopRequested) {
@@ -953,8 +960,9 @@ public class TabletServer extends AbstractServer {
     Runnable replicationWorkThreadPoolResizer = () -> {
       ThreadPools.resizePool(replicationThreadPool, aconf, Property.REPLICATION_WORKER_THREADS);
     };
-    context.getScheduledExecutor().scheduleWithFixedDelay(replicationWorkThreadPoolResizer, 10, 30,
-        TimeUnit.SECONDS);
+    ScheduledFuture<?> future = context.getScheduledExecutor()
+        .scheduleWithFixedDelay(replicationWorkThreadPoolResizer, 10, 30, TimeUnit.SECONDS);
+    ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
   public String getClientAddressString() {
@@ -1019,8 +1027,9 @@ public class TabletServer extends AbstractServer {
 
     Runnable gcDebugTask = () -> gcLogger.logGCInfo(getConfiguration());
 
-    context.getScheduledExecutor().scheduleWithFixedDelay(gcDebugTask, 0, TIME_BETWEEN_GC_CHECKS,
-        TimeUnit.MILLISECONDS);
+    ScheduledFuture<?> future = context.getScheduledExecutor().scheduleWithFixedDelay(gcDebugTask,
+        0, TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
+    ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
   public TabletServerStatus getStats(Map<TableId,MapCounter<ScanRunState>> scanCounts) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 190b022..23ed47f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -137,8 +137,8 @@ public class TabletServerResourceManager {
    */
   private void modifyThreadPoolSizesAtRuntime(IntSupplier maxThreads, String name,
       final ThreadPoolExecutor tp) {
-    context.getScheduledExecutor().scheduleWithFixedDelay(
-        () -> ThreadPools.resizePool(tp, maxThreads, name), 1, 10, TimeUnit.SECONDS);
+    ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
+        () -> ThreadPools.resizePool(tp, maxThreads, name), 1, 10, TimeUnit.SECONDS));
   }
 
   private ThreadPoolExecutor createPriorityExecutor(ScanExecutorConfig sec,
@@ -378,8 +378,8 @@ public class TabletServerResourceManager {
 
     // We can use the same map for both metadata and normal assignments since the keyspace (extent)
     // is guaranteed to be unique. Schedule the task once, the task will reschedule itself.
-    context.getScheduledExecutor().schedule(
-        new AssignmentWatcher(acuConf, context, activeAssignments), 5000, TimeUnit.MILLISECONDS);
+    ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().schedule(
+        new AssignmentWatcher(acuConf, context, activeAssignments), 5000, TimeUnit.MILLISECONDS));
   }
 
   /**
@@ -439,7 +439,8 @@ public class TabletServerResourceManager {
         if (log.isTraceEnabled()) {
           log.trace("Rescheduling assignment watcher to run in {}ms", delay);
         }
-        context.getScheduledExecutor().schedule(this, delay, TimeUnit.MILLISECONDS);
+        ThreadPools.watchCriticalScheduledTask(
+            context.getScheduledExecutor().schedule(this, delay, TimeUnit.MILLISECONDS));
       }
     }
   }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index b884e61..92089a0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -263,7 +263,7 @@ public class TabletServerLogger {
       return;
     }
     nextLogMaker = ThreadPools.createFixedThreadPool(1, "WALog creator", true);
-    nextLogMaker.submit(new Runnable() {
+    nextLogMaker.execute(new Runnable() {
       @Override
       public void run() {
         final ServerResources conf = tserver.getServerConfig();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
index e5d79ad..955d3a2 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.IntSupplier;
@@ -68,9 +69,9 @@ public class CompactionExecutorsMetrics implements MetricsProducer {
         ThreadPools.createScheduledExecutorService(1, "compactionExecutorsMetricsPoller", false);
     Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
     long minimumRefreshDelay = TimeUnit.SECONDS.toMillis(5);
-    scheduler.scheduleAtFixedRate(this::update, minimumRefreshDelay, minimumRefreshDelay,
-        TimeUnit.MILLISECONDS);
-
+    ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(this::update, minimumRefreshDelay,
+        minimumRefreshDelay, TimeUnit.MILLISECONDS);
+    ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
   public synchronized AutoCloseable addExecutor(CompactionExecutorId ceid,
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index ec10d73..ae43495 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -72,8 +73,8 @@ public class SessionManager {
 
     Runnable r = () -> sweep(maxIdle, maxUpdateIdle);
 
-    context.getScheduledExecutor().scheduleWithFixedDelay(r, 0, Math.max(maxIdle / 2, 1000),
-        TimeUnit.MILLISECONDS);
+    ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(r,
+        0, Math.max(maxIdle / 2, 1000), TimeUnit.MILLISECONDS));
   }
 
   public long createSession(Session session, boolean reserve) {
@@ -278,8 +279,9 @@ public class SessionManager {
         }
       };
 
-      ThreadPools.createGeneralScheduledExecutorService(aconf).schedule(r, delay,
-          TimeUnit.MILLISECONDS);
+      ScheduledFuture<?> future = ThreadPools.createGeneralScheduledExecutorService(aconf)
+          .schedule(r, delay, TimeUnit.MILLISECONDS);
+      ThreadPools.watchNonCriticalScheduledTask(future);
     }
   }
 
diff --git a/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java
index 48ee3c1..86a2b08 100644
--- a/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java
@@ -1247,7 +1247,7 @@ public class ConditionalWriterIT extends SharedMiniClusterBase {
 
         ExecutorService tp = Executors.newFixedThreadPool(5);
         for (int i = 0; i < 5; i++) {
-          tp.submit(new MutatorTask(tableName, client, rows, cw, failed));
+          tp.execute(new MutatorTask(tableName, client, rows, cw, failed));
         }
 
         tp.shutdown();
diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
index 87b5c9f..a6cbc6f 100644
--- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
+++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.test.compaction;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -45,7 +46,9 @@ public class ExternalDoNothingCompactor extends Compactor implements Iface {
   @Override
   protected void startCancelChecker(ScheduledThreadPoolExecutor schedExecutor,
       long timeBetweenChecks) {
-    schedExecutor.scheduleWithFixedDelay(() -> checkIfCanceled(), 0, 5000, TimeUnit.MILLISECONDS);
+    @SuppressWarnings("unused")
+    ScheduledFuture<?> future = schedExecutor.scheduleWithFixedDelay(() -> checkIfCanceled(), 0,
+        5000, TimeUnit.MILLISECONDS);
   }
 
   @Override
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
index bebd7ec..0e74000 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
@@ -128,7 +128,7 @@ public class ScanIdIT extends AccumuloClusterHarness {
       for (int scannerIndex = 0; scannerIndex < NUM_SCANNERS; scannerIndex++) {
         ScannerThread st = new ScannerThread(client, scannerIndex, tableName, latch);
         scanThreadsToClose.add(st);
-        pool.submit(st);
+        pool.execute(st);
       }
 
       // wait for scanners to report a result.
diff --git a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java
index 4d17c4c..3905872 100644
--- a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java
@@ -410,7 +410,7 @@ public class SuspendedTabletsIT extends ConfigurableMacBase {
             answer.scan(ctx, tableName, metaName);
             return answer;
           });
-          THREAD_POOL.submit(tlsFuture);
+          THREAD_POOL.execute(tlsFuture);
           return tlsFuture.get(5, SECONDS);
         } catch (TimeoutException ex) {
           log.debug("Retrieval timed out", ex);
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
index 4f93d3f..1358ab7 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
@@ -233,7 +233,7 @@ public class CollectTabletStats {
       }
 
       for (final KeyExtent ke : tabletsToTest) {
-        threadPool.submit(() -> {
+        threadPool.execute(() -> {
           try {
             calcTabletStats(client, opts.tableName, opts.auths, ke, columns);
           } catch (Exception e) {
@@ -318,7 +318,7 @@ public class CollectTabletStats {
     CountDownLatch finishedSignal = new CountDownLatch(numThreads);
 
     for (Test test : tests) {
-      threadPool.submit(test);
+      threadPool.execute(test);
       test.setSignals(startSignal, finishedSignal);
     }