You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2022/03/04 12:59:56 UTC
[accumulo] branch main updated: Check scheduled tasks periodically for errors (#2524)
This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 7fb1b5d Check scheduled tasks periodically for errors (#2524)
7fb1b5d is described below
commit 7fb1b5df9e3abe18304039fb3e781b2617eec4b5
Author: Dave Marion <dl...@apache.org>
AuthorDate: Fri Mar 4 07:59:03 2022 -0500
Check scheduled tasks periodically for errors (#2524)
Updated code that use ScheduleExecutorServices and were not retaining a reference to the ScheduleFuture object returned by the methods that create scheduled tasks. If the code was adding a non-scheduled task via the `submit` method, I changed the code to use the `execute` method. For code that was adding a scheduled task, but not doing anything with the ScheduledFuture, I modified the code to pass the ScheduledFuture reference to a utility method in ThreadPools. The new ThreadPools ut [...]
---
.../DefaultContextClassLoaderFactory.java | 4 +-
.../core/clientImpl/ConditionalWriterImpl.java | 7 +-
.../core/clientImpl/TabletServerBatchWriter.java | 36 +++---
.../file/blockfile/cache/lru/LruBlockCache.java | 6 +-
.../blockfile/cache/tinylfu/TinyLfuBlockCache.java | 5 +-
.../org/apache/accumulo/core/summary/Gatherer.java | 10 +-
.../util/ratelimit/SharedRateLimiterFactory.java | 10 +-
.../accumulo/core/util/threads/ThreadPools.java | 124 +++++++++++++++++++++
.../main/java/org/apache/accumulo/fate/Fate.java | 4 +-
.../core/file/rfile/MultiThreadedRFileTest.java | 2 +-
pom.xml | 3 +-
.../org/apache/accumulo/server/ServerContext.java | 4 +-
.../accumulo/server/client/BulkImporter.java | 6 +-
.../server/compaction/CompactionWatcher.java | 5 +-
.../org/apache/accumulo/server/fs/FileManager.java | 6 +-
.../accumulo/server/manager/LiveTServerSet.java | 6 +-
.../apache/accumulo/server/rpc/TServerUtils.java | 30 ++---
.../accumulo/server/util/FileSystemMonitor.java | 26 ++---
.../server/util/RemoveEntriesForMissingFiles.java | 2 +-
.../server/zookeeper/DistributedWorkQueue.java | 29 ++---
.../coordinator/CompactionCoordinator.java | 11 +-
.../accumulo/coordinator/CompactionFinalizer.java | 4 +-
.../coordinator/DeadCompactionDetector.java | 5 +-
.../org/apache/accumulo/compactor/Compactor.java | 11 +-
.../java/org/apache/accumulo/manager/Manager.java | 13 ++-
.../org/apache/accumulo/manager/ManagerTime.java | 6 +-
.../manager/metrics/ReplicationMetrics.java | 6 +-
.../accumulo/manager/metrics/fate/FateMetrics.java | 5 +-
.../accumulo/manager/recovery/RecoveryManager.java | 9 +-
.../util/logging/AccumuloMonitorAppender.java | 6 +-
.../apache/accumulo/tserver/AssignmentHandler.java | 33 +++---
.../org/apache/accumulo/tserver/TabletServer.java | 77 +++++++------
.../tserver/TabletServerResourceManager.java | 11 +-
.../accumulo/tserver/log/TabletServerLogger.java | 2 +-
.../metrics/CompactionExecutorsMetrics.java | 7 +-
.../accumulo/tserver/session/SessionManager.java | 10 +-
.../apache/accumulo/test/ConditionalWriterIT.java | 2 +-
.../compaction/ExternalDoNothingCompactor.java | 5 +-
.../apache/accumulo/test/functional/ScanIdIT.java | 2 +-
.../accumulo/test/manager/SuspendedTabletsIT.java | 2 +-
.../test/performance/scan/CollectTabletStats.java | 4 +-
41 files changed, 383 insertions(+), 173 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java b/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java
index e19c38b..6baaa64 100644
--- a/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java
@@ -22,6 +22,7 @@ import static java.util.concurrent.TimeUnit.MINUTES;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -62,7 +63,7 @@ public class DefaultContextClassLoaderFactory implements ContextClassLoaderFacto
private static void startCleanupThread(final AccumuloConfiguration conf,
final Supplier<Map<String,String>> contextConfigSupplier) {
- ThreadPools.createGeneralScheduledExecutorService(conf)
+ ScheduledFuture<?> future = ThreadPools.createGeneralScheduledExecutorService(conf)
.scheduleWithFixedDelay(Threads.createNamedRunnable(className + "-cleanup", () -> {
LOG.trace("{}-cleanup thread, properties: {}", className, conf);
Set<String> contextsInUse = contextConfigSupplier.get().keySet().stream()
@@ -71,6 +72,7 @@ public class DefaultContextClassLoaderFactory implements ContextClassLoaderFacto
LOG.trace("{}-cleanup thread, contexts in use: {}", className, contextsInUse);
AccumuloVFSClassLoader.removeUnusedContexts(contextsInUse);
}), 1, 1, MINUTES);
+ ThreadPools.watchNonCriticalScheduledTask(future);
LOG.debug("Context cleanup timer started at 60s intervals");
}
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
index ff0d596..d566bae 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
@@ -38,6 +38,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -113,6 +114,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
private Map<String,ServerQueue> serverQueues;
private DelayQueue<QCMutation> failedMutations = new DelayQueue<>();
private ScheduledThreadPoolExecutor threadPool;
+ private final ScheduledFuture<?> failureTaskFuture;
private class RQIterator implements Iterator<Result> {
@@ -380,12 +382,15 @@ class ConditionalWriterImpl implements ConditionalWriter {
queue(mutations);
};
- threadPool.scheduleAtFixedRate(failureHandler, 250, 250, MILLISECONDS);
+ failureTaskFuture = threadPool.scheduleAtFixedRate(failureHandler, 250, 250, MILLISECONDS);
}
@Override
public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
+ ThreadPools.ensureRunning(failureTaskFuture,
+ "Background task that re-queues failed mutations has exited.");
+
BlockingQueue<Result> resultQueue = new LinkedBlockingQueue<>();
List<QCMutation> mutationList = new ArrayList<>();
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
index 9047622..b2374bc 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
@@ -37,6 +37,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -126,6 +127,7 @@ public class TabletServerBatchWriter implements AutoCloseable {
// latency timers
private final ScheduledThreadPoolExecutor executor;
+ private ScheduledFuture<?> latencyTimerFuture;
private final Map<String,TimeoutTracker> timeoutTrackers =
Collections.synchronizedMap(new HashMap<>());
@@ -214,17 +216,18 @@ public class TabletServerBatchWriter implements AutoCloseable {
this.writer = new MutationWriter(config.getMaxWriteThreads());
if (this.maxLatency != Long.MAX_VALUE) {
- executor.scheduleWithFixedDelay(Threads.createNamedRunnable("BatchWriterLatencyTimer", () -> {
- try {
- synchronized (TabletServerBatchWriter.this) {
- if ((System.currentTimeMillis() - lastProcessingStartTime)
- > TabletServerBatchWriter.this.maxLatency)
- startProcessing();
- }
- } catch (Exception e) {
- updateUnknownErrors("Max latency task failed " + e.getMessage(), e);
- }
- }), 0, this.maxLatency / 4, MILLISECONDS);
+ latencyTimerFuture = executor
+ .scheduleWithFixedDelay(Threads.createNamedRunnable("BatchWriterLatencyTimer", () -> {
+ try {
+ synchronized (TabletServerBatchWriter.this) {
+ if ((System.currentTimeMillis() - lastProcessingStartTime)
+ > TabletServerBatchWriter.this.maxLatency)
+ startProcessing();
+ }
+ } catch (Exception e) {
+ updateUnknownErrors("Max latency task failed " + e.getMessage(), e);
+ }
+ }), 0, this.maxLatency / 4, MILLISECONDS);
}
}
@@ -248,6 +251,10 @@ public class TabletServerBatchWriter implements AutoCloseable {
throw new IllegalStateException("Closed");
if (m.size() == 0)
throw new IllegalArgumentException("Can not add empty mutations");
+ if (this.latencyTimerFuture != null) {
+ ThreadPools.ensureRunning(this.latencyTimerFuture,
+ "Latency timer thread has exited, cannot guarantee latency target");
+ }
checkForFailures();
@@ -576,14 +583,17 @@ public class TabletServerBatchWriter implements AutoCloseable {
private MutationSet recentFailures = null;
private long initTime;
private final Runnable task;
+ private final ScheduledFuture<?> future;
FailedMutations() {
task =
Threads.createNamedRunnable("failed mutationBatchWriterLatencyTimers handler", this::run);
- executor.scheduleWithFixedDelay(task, 0, 500, MILLISECONDS);
+ future = executor.scheduleWithFixedDelay(task, 0, 500, MILLISECONDS);
}
private MutationSet init() {
+ ThreadPools.ensureRunning(future,
+ "Background task that re-queues failed mutations has exited.");
if (recentFailures == null) {
recentFailures = new MutationSet();
initTime = System.currentTimeMillis();
@@ -775,7 +785,7 @@ public class TabletServerBatchWriter implements AutoCloseable {
for (String server : servers)
if (!queued.contains(server)) {
- sendThreadPool.submit(new SendTask(server));
+ sendThreadPool.execute(new SendTask(server));
queued.add(server);
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
index b66fa36..ad426b1 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
@@ -28,6 +28,7 @@ import java.util.Objects;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
@@ -160,8 +161,9 @@ public class LruBlockCache extends SynchronousLoadingBlockCache implements Block
} else {
this.evictionThread = null;
}
- this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod,
- statThreadPeriod, SECONDS);
+ ScheduledFuture<?> future = this.scheduleThreadPool.scheduleAtFixedRate(
+ new StatisticsThread(this), statThreadPeriod, statThreadPeriod, SECONDS);
+ ThreadPools.watchNonCriticalScheduledTask(future);
}
public long getOverhead() {
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
index 6553319..9819923 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.function.Supplier;
import org.apache.accumulo.core.file.blockfile.cache.impl.ClassSize;
@@ -71,7 +72,9 @@ public final class TinyLfuBlockCache implements BlockCache {
}).maximumWeight(conf.getMaxSize(type)).recordStats().build();
policy = cache.policy().eviction().get();
maxSize = (int) Math.min(Integer.MAX_VALUE, policy.getMaximum());
- statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC, STATS_PERIOD_SEC, SECONDS);
+ ScheduledFuture<?> future = statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC,
+ STATS_PERIOD_SEC, SECONDS);
+ ThreadPools.watchNonCriticalScheduledTask(future);
}
@Override
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
index 17b7a52..6a0938f 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
@@ -396,7 +396,10 @@ public class Gatherer {
// when all processing is done, check for failed files... and if found starting processing
// again
- future.thenRun(this::updateFuture);
+ @SuppressWarnings("unused")
+ CompletableFuture<Void> unused = future.thenRun(() -> {
+ CompletableFuture<ProcessedFiles> unused2 = this.updateFuture();
+ });
} catch (Exception e) {
future = CompletableFuture.completedFuture(new ProcessedFiles());
// force future to have this exception
@@ -449,7 +452,8 @@ public class Gatherer {
@Override
public synchronized boolean isDone() {
- updateFuture();
+ @SuppressWarnings("unused")
+ CompletableFuture<ProcessedFiles> unused = updateFuture();
if (future.isDone()) {
if (future.isCancelled() || future.isCompletedExceptionally()) {
return true;
@@ -459,7 +463,7 @@ public class Gatherer {
if (pf.failedFiles.isEmpty()) {
return true;
} else {
- updateFuture();
+ unused = updateFuture();
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
index f31d26b..ae61337 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
@@ -25,6 +25,7 @@ import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.Map;
import java.util.WeakHashMap;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@@ -43,6 +44,7 @@ public class SharedRateLimiterFactory {
private static final long REPORT_RATE = 60000;
private static final long UPDATE_RATE = 1000;
private static SharedRateLimiterFactory instance = null;
+ private static ScheduledFuture<?> updateTaskFuture;
private final Logger log = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
private final WeakHashMap<String,WeakReference<SharedRateLimiter>> activeLimiters =
new WeakHashMap<>();
@@ -55,13 +57,14 @@ public class SharedRateLimiterFactory {
instance = new SharedRateLimiterFactory();
ScheduledThreadPoolExecutor svc = ThreadPools.createGeneralScheduledExecutorService(conf);
- svc.scheduleWithFixedDelay(Threads
+ updateTaskFuture = svc.scheduleWithFixedDelay(Threads
.createNamedRunnable("SharedRateLimiterFactory update polling", instance::updateAll),
UPDATE_RATE, UPDATE_RATE, MILLISECONDS);
- svc.scheduleWithFixedDelay(Threads
+ ScheduledFuture<?> future = svc.scheduleWithFixedDelay(Threads
.createNamedRunnable("SharedRateLimiterFactory report polling", instance::reportAll),
REPORT_RATE, REPORT_RATE, MILLISECONDS);
+ ThreadPools.watchNonCriticalScheduledTask(future);
}
return instance;
@@ -91,6 +94,9 @@ public class SharedRateLimiterFactory {
*/
public RateLimiter create(String name, RateProvider rateProvider) {
synchronized (activeLimiters) {
+ if (updateTaskFuture.isDone()) {
+ log.warn("SharedRateLimiterFactory update task has failed.");
+ }
var limiterRef = activeLimiters.get(name);
var limiter = limiterRef == null ? null : limiterRef.get();
if (limiter == null) {
diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 67d08ba..afa05c0 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@ -21,9 +21,14 @@ package org.apache.accumulo.core.util.threads;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
+import java.util.Iterator;
+import java.util.List;
import java.util.OptionalInt;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
@@ -40,13 +45,132 @@ import org.apache.accumulo.core.trace.TraceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN",
+ justification = "Throwing Error for it to be caught by AccumuloUncaughtExceptionHandler")
public class ThreadPools {
+ public static class ExecutionError extends Error {
+
+ private static final long serialVersionUID = 1L;
+
+ public ExecutionError(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
// the number of seconds before we allow a thread to terminate with non-use.
public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
+ private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+ createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+ private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+ new ConcurrentLinkedQueue<>();
+
+ private static final ConcurrentLinkedQueue<ScheduledFuture<?>> NON_CRITICAL_RUNNING_TASKS =
+ new ConcurrentLinkedQueue<>();
+
+ private static Runnable TASK_CHECKER = new Runnable() {
+ @Override
+ public void run() {
+ final List<ConcurrentLinkedQueue<ScheduledFuture<?>>> queues =
+ List.of(CRITICAL_RUNNING_TASKS, NON_CRITICAL_RUNNING_TASKS);
+ while (true) {
+ queues.forEach(q -> {
+ Iterator<ScheduledFuture<?>> tasks = q.iterator();
+ while (tasks.hasNext()) {
+ if (checkTaskFailed(tasks.next(), q)) {
+ tasks.remove();
+ }
+ }
+ });
+ try {
+ TimeUnit.MINUTES.sleep(1);
+ } catch (InterruptedException ie) {
+ // This thread was interrupted by something while sleeping. We don't want to exit
+ // this thread, so reset the interrupt state on this thread and keep going.
+ Thread.interrupted();
+ }
+ }
+ }
+ };
+
+ /**
+ * Checks to see if a ScheduledFuture has exited successfully or thrown an error
+ *
+ * @param future
+ * scheduled future to check
+ * @param taskQueue
+ * the running task queue from which the future came
+ * @return true if the future should be removed
+ */
+ private static boolean checkTaskFailed(ScheduledFuture<?> future,
+ ConcurrentLinkedQueue<ScheduledFuture<?>> taskQueue) {
+ // Calling get() on a ScheduledFuture will block unless that scheduled task has
+ // completed. We call isDone() here instead. If the scheduled task is done then
+ // either it was a one-shot task, cancelled or an exception was thrown.
+ if (future.isDone()) {
+ // Now call get() to see if we get an exception.
+ try {
+ future.get();
+ // If we get here, then a scheduled task exited but did not throw an error
+ // or get canceled. This was likely a one-shot scheduled task (I don't think
+ // we can tell if it's one-shot or not, I think we have to assume that it is
+ // and that a recurring task would not normally be complete).
+ return true;
+ } catch (ExecutionException ee) {
+ // An exception was thrown in the critical task. Throw the error here, which
+ // will then be caught by the AccumuloUncaughtExceptionHandler which will
+ // log the error and terminate the VM.
+ if (taskQueue == CRITICAL_RUNNING_TASKS) {
+ throw new ExecutionError("Critical scheduled background task failed.", ee);
+ } else {
+ LOG.error("Non-critical scheduled background task failed", ee);
+ return true;
+ }
+ } catch (CancellationException ce) {
+ // do nothing here as it appears that the task was canceled. Remove it from
+ // the list of critical tasks
+ return true;
+ } catch (InterruptedException ie) {
+ // current thread was interrupted waiting for get to return, which in theory,
+ // shouldn't happen since the task is done.
+ LOG.info("Interrupted while waiting to check on scheduled background task.");
+ // Reset the interrupt state on this thread
+ Thread.interrupted();
+ }
+ }
+ return false;
+ }
+
+ static {
+ SCHEDULED_FUTURE_CHECKER_POOL.execute(TASK_CHECKER);
+ }
+
+ public static void watchCriticalScheduledTask(ScheduledFuture<?> future) {
+ CRITICAL_RUNNING_TASKS.add(future);
+ }
+
+ public static void watchNonCriticalScheduledTask(ScheduledFuture<?> future) {
+ NON_CRITICAL_RUNNING_TASKS.add(future);
+ }
+
+ public static void ensureRunning(ScheduledFuture<?> future, String message) {
+ if (future.isDone()) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ throw new IllegalStateException(message, e);
+ }
+ // it exited w/o exception, but we still expect it to be running so throw an exception.
+ throw new IllegalStateException(message);
+ }
+ }
+
/**
* Resize ThreadPoolExecutor based on current value of maxThreads
*
diff --git a/core/src/main/java/org/apache/accumulo/fate/Fate.java b/core/src/main/java/org/apache/accumulo/fate/Fate.java
index 0ca8a0a..d467dc3 100644
--- a/core/src/main/java/org/apache/accumulo/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/fate/Fate.java
@@ -240,7 +240,7 @@ public class Fate<T> {
final ThreadPoolExecutor pool =
ThreadPools.createExecutorService(conf, Property.MANAGER_FATE_THREADPOOL_SIZE, true);
fatePoolWatcher = ThreadPools.createGeneralScheduledExecutorService(conf);
- fatePoolWatcher.schedule(() -> {
+ ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.schedule(() -> {
// resize the pool if the property changed
ThreadPools.resizePool(pool, conf, Property.MANAGER_FATE_THREADPOOL_SIZE);
// If the pool grew, then ensure that there is a TransactionRunner for each thread
@@ -262,7 +262,7 @@ public class Fate<T> {
}
}
}
- }, 3, SECONDS);
+ }, 3, SECONDS));
executor = pool;
}
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
index 0358c66..9ce5649 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
@@ -248,7 +248,7 @@ public class MultiThreadedRFileTest {
}
};
for (int i = 0; i < maxThreads; i++) {
- pool.submit(runnable);
+ pool.execute(runnable);
}
} finally {
pool.shutdown();
diff --git a/pom.xml b/pom.xml
index 363e64e..a638932 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1674,7 +1674,7 @@
<arg>-XDcompilePolicy=simple</arg>
<arg>
-Xplugin:ErrorProne \
- -XepExcludedPaths:.*/(proto|thrift|generated-sources)/.* \
+ -XepExcludedPaths:.*/(proto|thrift|generated-sources|src/test)/.* \
-XepDisableWarningsInGeneratedCode \
-XepDisableAllWarnings \
<!-- error/warning patterns to ignore -->
@@ -1682,6 +1682,7 @@
-Xep:CheckReturnValue:OFF \
-Xep:MustBeClosedChecker:OFF \
-Xep:ReturnValueIgnored:OFF \
+ -Xep:FutureReturnValueIgnored:ERROR \
-Xep:UnicodeInCode:OFF \
<!-- error/warning patterns to specifically check -->
-Xep:ExpectedExceptionChecker \
diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
index aa59057..5d2bafd 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
@@ -34,6 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -423,7 +424,7 @@ public class ServerContext extends ClientContext {
}
private void monitorSwappiness() {
- getScheduledExecutor().scheduleWithFixedDelay(() -> {
+ ScheduledFuture<?> future = getScheduledExecutor().scheduleWithFixedDelay(() -> {
try {
String procFile = "/proc/sys/vm/swappiness";
File swappiness = new File(procFile);
@@ -445,6 +446,7 @@ public class ServerContext extends ClientContext {
log.error("", t);
}
}, SECONDS.toMillis(1), MINUTES.toMillis(10), TimeUnit.MILLISECONDS);
+ ThreadPools.watchNonCriticalScheduledTask(future);
}
/**
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
index a290c85..d68c5ea 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@ -153,7 +153,7 @@ public class BulkImporter {
assignments.put(mapFile, tabletsToAssignMapFileTo);
}
};
- threadPool.submit(getAssignments);
+ threadPool.execute(getAssignments);
}
threadPool.shutdown();
while (!threadPool.isTerminated()) {
@@ -396,7 +396,7 @@ public class BulkImporter {
}
};
- threadPool.submit(estimationTask);
+ threadPool.execute(estimationTask);
}
threadPool.shutdown();
@@ -541,7 +541,7 @@ public class BulkImporter {
for (Entry<String,Map<KeyExtent,List<PathSize>>> entry : assignmentsPerTabletServer
.entrySet()) {
String location = entry.getKey();
- threadPool.submit(new AssignmentTask(assignmentFailures, location, entry.getValue()));
+ threadPool.execute(new AssignmentTask(assignmentFailures, location, entry.getValue()));
}
threadPool.shutdown();
diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionWatcher.java
index ba5e5fc..d5c987a 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionWatcher.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionWatcher.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.server.ServerContext;
import org.slf4j.LoggerFactory;
@@ -122,8 +123,8 @@ public class CompactionWatcher implements Runnable {
public static synchronized void startWatching(ServerContext context) {
if (!watching) {
- context.getScheduledExecutor().scheduleWithFixedDelay(
- new CompactionWatcher(context.getConfiguration()), 10000, 10000, TimeUnit.MILLISECONDS);
+ ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
+ new CompactionWatcher(context.getConfiguration()), 10000, 10000, TimeUnit.MILLISECONDS));
watching = true;
}
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
index 4763f59..cd276e5 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
@@ -50,6 +50,7 @@ import org.apache.accumulo.core.iteratorsImpl.system.TimeSettingIterator;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.problems.ProblemReport;
import org.apache.accumulo.server.problems.ProblemReportingIterator;
@@ -163,8 +164,9 @@ public class FileManager {
this.reservedReaders = new HashMap<>();
this.maxIdleTime = this.context.getConfiguration().getTimeInMillis(Property.TSERV_MAX_IDLE);
- this.context.getScheduledExecutor().scheduleWithFixedDelay(new IdleFileCloser(), maxIdleTime,
- maxIdleTime / 2, TimeUnit.MILLISECONDS);
+ ThreadPools.watchCriticalScheduledTask(
+ this.context.getScheduledExecutor().scheduleWithFixedDelay(new IdleFileCloser(),
+ maxIdleTime, maxIdleTime / 2, TimeUnit.MILLISECONDS));
this.slowFilePermitMillis =
this.context.getConfiguration().getTimeInMillis(Property.TSERV_SLOW_FILEPERMIT_MILLIS);
diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
index a88af20..fbfec53 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
@@ -45,6 +45,7 @@ import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.ServerServices;
+import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.fate.zookeeper.ServiceLock;
import org.apache.accumulo.fate.zookeeper.ZooCache;
import org.apache.accumulo.fate.zookeeper.ZooCache.ZcStat;
@@ -254,8 +255,9 @@ public class LiveTServerSet implements Watcher {
public synchronized void startListeningForTabletServerChanges() {
scanServers();
- this.context.getScheduledExecutor().scheduleWithFixedDelay(this::scanServers, 0, 5000,
- TimeUnit.MILLISECONDS);
+
+ ThreadPools.watchCriticalScheduledTask(this.context.getScheduledExecutor()
+ .scheduleWithFixedDelay(this::scanServers, 0, 5000, TimeUnit.MILLISECONDS));
}
public synchronized void scanServers() {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index eeccf7e..80e48cf 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -314,20 +314,22 @@ public class TServerUtils {
final ThreadPoolExecutor pool = ThreadPools.createFixedThreadPool(executorThreads,
threadTimeOut, TimeUnit.MILLISECONDS, serverName + "-ClientPool", true);
// periodically adjust the number of threads we need by checking how busy our threads are
- ThreadPools.createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay(() -> {
- // there is a minor race condition between sampling the current state of the thread pool and
- // adjusting it
- // however, this isn't really an issue, since it adjusts periodically anyway
- if (pool.getCorePoolSize() <= pool.getActiveCount()) {
- int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
- ThreadPools.resizePool(pool, () -> larger, serverName + "-ClientPool");
- } else {
- if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
- int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1);
- ThreadPools.resizePool(pool, () -> smaller, serverName + "-ClientPool");
- }
- }
- }, timeBetweenThreadChecks, timeBetweenThreadChecks, TimeUnit.MILLISECONDS);
+ ThreadPools.watchCriticalScheduledTask(
+ ThreadPools.createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay(() -> {
+ // there is a minor race condition between sampling the current state of the thread pool
+ // and
+ // adjusting it
+ // however, this isn't really an issue, since it adjusts periodically anyway
+ if (pool.getCorePoolSize() <= pool.getActiveCount()) {
+ int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
+ ThreadPools.resizePool(pool, () -> larger, serverName + "-ClientPool");
+ } else {
+ if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
+ int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1);
+ ThreadPools.resizePool(pool, () -> smaller, serverName + "-ClientPool");
+ }
+ }
+ }, timeBetweenThreadChecks, timeBetweenThreadChecks, TimeUnit.MILLISECONDS));
return pool;
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java b/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java
index 39ce84a..11d19cd 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java
@@ -117,20 +117,20 @@ public class FileSystemMonitor {
// Create a task to check each mount periodically to see if its state has changed.
for (Mount mount : mounts) {
- ThreadPools.createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay(
- Threads.createNamedRunnable(mount.mountPoint + "filesystem monitor", () -> {
- try {
- checkMount(mount);
- } catch (final Exception e) {
- Halt.halt(-42, new Runnable() {
- @Override
- public void run() {
- log.error("Exception while checking mount points, halting process", e);
+ ThreadPools.watchCriticalScheduledTask(
+ ThreadPools.createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay(
+ Threads.createNamedRunnable(mount.mountPoint + "filesystem monitor", () -> {
+ try {
+ checkMount(mount);
+ } catch (final Exception e) {
+ Halt.halt(-42, new Runnable() {
+ @Override
+ public void run() {
+ log.error("Exception while checking mount points, halting process", e);
+ }
+ });
}
- });
- }
- }), period, period, TimeUnit.MILLISECONDS);
-
+ }), period, period, TimeUnit.MILLISECONDS));
}
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java b/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
index 14914f4..ba22df7 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
@@ -156,7 +156,7 @@ public class RemoveEntriesForMissingFiles {
processing.add(map);
}
- threadPool.submit(
+ threadPool.execute(
new CheckFileTask(cache, fs, missing, writer, key, map, processing, exceptionRef));
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
index 3a424dc..cbbf81b 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
@@ -217,19 +218,20 @@ public class DistributedWorkQueue {
lookForWork(processor, children);
// Add a little jitter to avoid all the tservers slamming zookeeper at once
- context.getScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- log.debug("Looking for work in {}", path);
- try {
- lookForWork(processor, zoo.getChildren(path));
- } catch (KeeperException e) {
- log.error("Failed to look for work", e);
- } catch (InterruptedException e) {
- log.info("Interrupted looking for work", e);
- }
- }
- }, timerInitialDelay, timerPeriod, TimeUnit.MILLISECONDS);
+ ThreadPools.watchCriticalScheduledTask(
+ context.getScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ log.debug("Looking for work in {}", path);
+ try {
+ lookForWork(processor, zoo.getChildren(path));
+ } catch (KeeperException e) {
+ log.error("Failed to look for work", e);
+ } catch (InterruptedException e) {
+ log.info("Interrupted looking for work", e);
+ }
+ }
+ }, timerInitialDelay, timerPeriod, TimeUnit.MILLISECONDS));
}
/**
@@ -240,6 +242,7 @@ public class DistributedWorkQueue {
}
public void addWork(String workId, byte[] data) throws KeeperException, InterruptedException {
+
if (workId.equalsIgnoreCase(LOCKS_NODE))
throw new IllegalArgumentException("locks is reserved work id");
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index d534062..77a5c3b 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -28,6 +28,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -162,12 +163,16 @@ public class CompactionCoordinator extends AbstractServer
}
protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
- schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0,
- TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
+ ScheduledFuture<?> future =
+ schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0,
+ TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
+ ThreadPools.watchNonCriticalScheduledTask(future);
}
protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) {
- schedExecutor.scheduleWithFixedDelay(() -> cleanUpCompactors(), 0, 5, TimeUnit.MINUTES);
+ ScheduledFuture<?> future =
+ schedExecutor.scheduleWithFixedDelay(() -> cleanUpCompactors(), 0, 5, TimeUnit.MINUTES);
+ ThreadPools.watchNonCriticalScheduledTask(future);
}
protected void printStartupMsg() {
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
index 424deb7..196ac2a 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
@@ -83,8 +83,8 @@ public class CompactionFinalizer {
processPending();
});
- schedExecutor.scheduleWithFixedDelay(() -> notifyTservers(), 0, tserverCheckInterval,
- TimeUnit.MILLISECONDS);
+ ThreadPools.watchCriticalScheduledTask(schedExecutor.scheduleWithFixedDelay(
+ () -> notifyTservers(), 0, tserverCheckInterval, TimeUnit.MILLISECONDS));
}
public void commitCompaction(ExternalCompactionId ecid, KeyExtent extent, long fileSize,
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
index e4ee3bd..5a9e526 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
@@ -34,6 +34,7 @@ import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
+import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.server.ServerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -138,12 +139,12 @@ public class DeadCompactionDetector {
long interval = this.context.getConfiguration()
.getTimeInMillis(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL);
- schedExecutor.scheduleWithFixedDelay(() -> {
+ ThreadPools.watchCriticalScheduledTask(schedExecutor.scheduleWithFixedDelay(() -> {
try {
detectDeadCompactions();
} catch (RuntimeException e) {
log.warn("Failed to look for dead compactions", e);
}
- }, 0, interval, TimeUnit.MILLISECONDS);
+ }, 0, interval, TimeUnit.MILLISECONDS));
}
}
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 14b0545..d1cc457 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -196,14 +197,16 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac
}
protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
- schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0,
- TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
+ ScheduledFuture<?> future =
+ schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0,
+ TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
+ ThreadPools.watchNonCriticalScheduledTask(future);
}
protected void startCancelChecker(ScheduledThreadPoolExecutor schedExecutor,
long timeBetweenChecks) {
- schedExecutor.scheduleWithFixedDelay(() -> checkIfCanceled(), 0, timeBetweenChecks,
- TimeUnit.MILLISECONDS);
+ ThreadPools.watchCriticalScheduledTask(schedExecutor.scheduleWithFixedDelay(
+ () -> checkIfCanceled(), 0, timeBetweenChecks, TimeUnit.MILLISECONDS));
}
protected void checkIfCanceled() {
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 7071569..7f360de 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -252,11 +253,12 @@ public class Manager extends AbstractServer
if (newState == ManagerState.STOP) {
// Give the server a little time before shutdown so the client
// thread requesting the stop can return
- getContext().getScheduledExecutor().scheduleWithFixedDelay(() -> {
+ ScheduledFuture<?> future = getContext().getScheduledExecutor().scheduleWithFixedDelay(() -> {
// This frees the main thread and will cause the manager to exit
clientService.stop();
Manager.this.nextEvent.event("stopped event loop");
}, 100L, 1000L, TimeUnit.MILLISECONDS);
+ ThreadPools.watchNonCriticalScheduledTask(future);
}
if (oldState != newState && (newState == ManagerState.HAVE_LOCK)) {
@@ -930,7 +932,7 @@ public class Manager extends AbstractServer
// unresponsive tservers.
sleepUninterruptibly(Math.max(1, rpcTimeout / 120_000), TimeUnit.MILLISECONDS);
}
- tp.submit(() -> {
+ tp.execute(() -> {
try {
Thread t = Thread.currentThread();
String oldName = t.getName();
@@ -1150,8 +1152,8 @@ public class Manager extends AbstractServer
fate = new Fate<>(this, store, TraceRepo::toLogString);
fate.startTransactionRunners(getConfiguration());
- context.getScheduledExecutor().scheduleWithFixedDelay(store::ageOff, 63000, 63000,
- TimeUnit.MILLISECONDS);
+ ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
+ .scheduleWithFixedDelay(store::ageOff, 63000, 63000, TimeUnit.MILLISECONDS));
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException("Exception setting up FaTE cleanup thread", e);
}
@@ -1198,7 +1200,7 @@ public class Manager extends AbstractServer
// if the replication name is ever set, then start replication services
final AtomicReference<TServer> replServer = new AtomicReference<>();
- context.getScheduledExecutor().scheduleWithFixedDelay(() -> {
+ ScheduledFuture<?> future = context.getScheduledExecutor().scheduleWithFixedDelay(() -> {
try {
@SuppressWarnings("deprecation")
Property p = Property.REPLICATION_NAME;
@@ -1210,6 +1212,7 @@ public class Manager extends AbstractServer
log.error("Error occurred starting replication services. ", e);
}
}, 0, 5000, TimeUnit.MILLISECONDS);
+ ThreadPools.watchNonCriticalScheduledTask(future);
// checking stored user hashes if any of them uses an outdated algorithm
security.validateStoredUserCreditentials();
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java
index 41077bd..237a207 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerTime.java
@@ -64,9 +64,9 @@ public class ManagerTime {
throw new IOException("Error updating manager time", ex);
}
- ThreadPools.createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay(
- Threads.createNamedRunnable("Manager time keeper", () -> run()), 0, SECONDS.toMillis(10),
- MILLISECONDS);
+ ThreadPools.watchCriticalScheduledTask(ThreadPools.createGeneralScheduledExecutorService(conf)
+ .scheduleWithFixedDelay(Threads.createNamedRunnable("Manager time keeper", () -> run()), 0,
+ SECONDS.toMillis(10), MILLISECONDS));
}
/**
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ReplicationMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ReplicationMetrics.java
index 79eb5ac..c90337b 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ReplicationMetrics.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ReplicationMetrics.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -165,8 +166,9 @@ public class ReplicationMetrics implements MetricsProducer {
ThreadPools.createScheduledExecutorService(1, "replicationMetricsPoller", false);
Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
long minimumRefreshDelay = TimeUnit.SECONDS.toMillis(5);
- scheduler.scheduleAtFixedRate(this::update, minimumRefreshDelay, minimumRefreshDelay,
- TimeUnit.MILLISECONDS);
+ ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(this::update, minimumRefreshDelay,
+ minimumRefreshDelay, TimeUnit.MILLISECONDS);
+ ThreadPools.watchNonCriticalScheduledTask(future);
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
index 055092f..42e18db 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.manager.metrics.fate;
import java.util.Map.Entry;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -155,14 +156,14 @@ public class FateMetrics implements MetricsProducer {
ThreadPools.createScheduledExecutorService(1, "fateMetricsPoller", false);
Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
- scheduler.scheduleAtFixedRate(() -> {
+ ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
try {
update();
} catch (Exception ex) {
log.info("Failed to update fate metrics due to exception", ex);
}
}, refreshDelay, refreshDelay, TimeUnit.MILLISECONDS);
-
+ ThreadPools.watchNonCriticalScheduledTask(future);
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
index 0503963..a1ba264 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
@@ -105,7 +106,8 @@ public class RecoveryManager {
manager.getVolumeManager(), new Path(source));
if (time > 0) {
- executor.schedule(this, time, TimeUnit.MILLISECONDS);
+ ScheduledFuture<?> future = executor.schedule(this, time, TimeUnit.MILLISECONDS);
+ ThreadPools.watchNonCriticalScheduledTask(future);
rescheduled = true;
} else {
initiateSort(sortId, source, destination);
@@ -210,8 +212,9 @@ public class RecoveryManager {
log.info("Starting recovery of {} (in : {}s), tablet {} holds a reference", filename,
(delay / 1000), extent);
- executor.schedule(new LogSortTask(closer, filename, dest, sortId), delay,
- TimeUnit.MILLISECONDS);
+ ScheduledFuture<?> future = executor.schedule(
+ new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS);
+ ThreadPools.watchNonCriticalScheduledTask(future);
closeTasksQueued.add(sortId);
recoveryDelay.put(sortId, delay);
}
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java
index 368e2a6..f62bb8b 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java
@@ -26,8 +26,10 @@ import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublishers;
+import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.accumulo.core.Constants;
@@ -122,7 +124,9 @@ public class AccumuloMonitorAppender extends AbstractAppender {
var req = HttpRequest.newBuilder(uri).POST(BodyPublishers.ofString(jsonEvent, UTF_8))
.setHeader("Content-Type", "application/json").build();
- httpClient.sendAsync(req, BodyHandlers.discarding());
+ @SuppressWarnings("unused")
+ CompletableFuture<HttpResponse<Void>> future =
+ httpClient.sendAsync(req, BodyHandlers.discarding());
} catch (final Exception e) {
error("Unable to send HTTP in appender [" + getName() + "]", event, e);
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
index df7f5fd..ece1d9c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
@@ -33,6 +33,7 @@ import org.apache.accumulo.core.manager.thrift.TabletLoadState;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.server.manager.state.Assignment;
import org.apache.accumulo.server.manager.state.TabletStateStore;
@@ -220,22 +221,24 @@ class AssignmentHandler implements Runnable {
server.enqueueManagerMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
long reschedule = Math.min((1L << Math.min(32, retryAttempt)) * 1000, MINUTES.toMillis(10));
log.warn(String.format("rescheduling tablet load in %.2f seconds", reschedule / 1000.));
- this.server.getContext().getScheduledExecutor().schedule(new Runnable() {
- @Override
- public void run() {
- log.info("adding tablet {} back to the assignment pool (retry {})", extent, retryAttempt);
- AssignmentHandler handler = new AssignmentHandler(server, extent, retryAttempt + 1);
- if (extent.isMeta()) {
- if (extent.isRootTablet()) {
- Threads.createThread("Root tablet assignment retry", handler).start();
- } else {
- server.resourceManager.addMetaDataAssignment(extent, log, handler);
+ ThreadPools.watchCriticalScheduledTask(
+ this.server.getContext().getScheduledExecutor().schedule(new Runnable() {
+ @Override
+ public void run() {
+ log.info("adding tablet {} back to the assignment pool (retry {})", extent,
+ retryAttempt);
+ AssignmentHandler handler = new AssignmentHandler(server, extent, retryAttempt + 1);
+ if (extent.isMeta()) {
+ if (extent.isRootTablet()) {
+ Threads.createThread("Root tablet assignment retry", handler).start();
+ } else {
+ server.resourceManager.addMetaDataAssignment(extent, log, handler);
+ }
+ } else {
+ server.resourceManager.addAssignment(extent, log, handler);
+ }
}
- } else {
- server.resourceManager.addAssignment(extent, log, handler);
- }
- }
- }, reschedule, TimeUnit.MILLISECONDS);
+ }, reschedule, TimeUnit.MILLISECONDS));
}
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index c4146c1..c53bb91 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -50,6 +50,7 @@ import java.util.UUID;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -260,7 +261,7 @@ public class TabletServer extends AbstractServer {
// This thread will calculate and log out the busiest tablets based on ingest count and
// query count every #{logBusiestTabletsDelay}
if (numBusyTabletsToLog > 0) {
- context.getScheduledExecutor()
+ ScheduledFuture<?> future = context.getScheduledExecutor()
.scheduleWithFixedDelay(Threads.createNamedRunnable("BusyTabletLogger", new Runnable() {
private BusiestTracker ingestTracker =
BusiestTracker.newBusiestIngestTracker(numBusyTabletsToLog);
@@ -285,9 +286,10 @@ public class TabletServer extends AbstractServer {
}
}
}), logBusyTabletsDelay, logBusyTabletsDelay, TimeUnit.MILLISECONDS);
+ ThreadPools.watchNonCriticalScheduledTask(future);
}
- context.getScheduledExecutor()
+ ScheduledFuture<?> future = context.getScheduledExecutor()
.scheduleWithFixedDelay(Threads.createNamedRunnable("TabletRateUpdater", new Runnable() {
@Override
public void run() {
@@ -301,6 +303,7 @@ public class TabletServer extends AbstractServer {
}
}
}), 5, 5, TimeUnit.SECONDS);
+ ThreadPools.watchNonCriticalScheduledTask(future);
@SuppressWarnings("deprecation")
final long walMaxSize =
@@ -348,8 +351,8 @@ public class TabletServer extends AbstractServer {
this.resourceManager = new TabletServerResourceManager(context);
this.security = AuditedSecurityOperation.getInstance(context);
- context.getScheduledExecutor().scheduleWithFixedDelay(TabletLocator::clearLocators, jitter(),
- jitter(), TimeUnit.MILLISECONDS);
+ ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
+ TabletLocator::clearLocators, jitter(), jitter(), TimeUnit.MILLISECONDS));
walMarker = new WalStateManager(context);
// Create the secret manager
@@ -791,7 +794,7 @@ public class TabletServer extends AbstractServer {
// if the replication name is ever set, then start replication services
@SuppressWarnings("deprecation")
Property p = Property.REPLICATION_NAME;
- context.getScheduledExecutor().scheduleWithFixedDelay(() -> {
+ ScheduledFuture<?> future = context.getScheduledExecutor().scheduleWithFixedDelay(() -> {
if (this.replServer == null) {
if (!getConfiguration().get(p).isEmpty()) {
log.info(p.getKey() + " was set, starting repl services.");
@@ -799,36 +802,40 @@ public class TabletServer extends AbstractServer {
}
}
}, 0, 5, TimeUnit.SECONDS);
+ ThreadPools.watchNonCriticalScheduledTask(future);
int tabletCheckFrequency = 30 + random.nextInt(31); // random 30-60 minute delay
// Periodically check that metadata of tablets matches what is held in memory
- ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(() -> {
- final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = onlineTablets.snapshot();
-
- Map<KeyExtent,Long> updateCounts = new HashMap<>();
-
- // gather updateCounts for each tablet
- onlineTabletsSnapshot.forEach((ke, tablet) -> {
- updateCounts.put(ke, tablet.getUpdateCount());
- });
-
- // gather metadata for all tablets readTablets()
- try (TabletsMetadata tabletsMetadata = getContext().getAmple().readTablets()
- .forTablets(onlineTabletsSnapshot.keySet()).fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) {
-
- // for each tablet, compare its metadata to what is held in memory
- tabletsMetadata.forEach(tabletMetadata -> {
- KeyExtent extent = tabletMetadata.getExtent();
- Tablet tablet = onlineTabletsSnapshot.get(extent);
- Long counter = updateCounts.get(extent);
- tablet.compareTabletInfo(counter, tabletMetadata);
- });
- }
- }, tabletCheckFrequency, tabletCheckFrequency, TimeUnit.MINUTES);
+ ThreadPools.watchCriticalScheduledTask(
+ ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(() -> {
+ final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = onlineTablets.snapshot();
+
+ Map<KeyExtent,Long> updateCounts = new HashMap<>();
+
+ // gather updateCounts for each tablet
+ onlineTabletsSnapshot.forEach((ke, tablet) -> {
+ updateCounts.put(ke, tablet.getUpdateCount());
+ });
+
+ // gather metadata for all tablets readTablets()
+ try (TabletsMetadata tabletsMetadata =
+ getContext().getAmple().readTablets().forTablets(onlineTabletsSnapshot.keySet())
+ .fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) {
+
+ // for each tablet, compare its metadata to what is held in memory
+ tabletsMetadata.forEach(tabletMetadata -> {
+ KeyExtent extent = tabletMetadata.getExtent();
+ Tablet tablet = onlineTabletsSnapshot.get(extent);
+ Long counter = updateCounts.get(extent);
+ tablet.compareTabletInfo(counter, tabletMetadata);
+ });
+ }
+ }, tabletCheckFrequency, tabletCheckFrequency, TimeUnit.MINUTES));
final long CLEANUP_BULK_LOADED_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(15);
- context.getScheduledExecutor().scheduleWithFixedDelay(new BulkImportCacheCleaner(this),
- CLEANUP_BULK_LOADED_CACHE_MILLIS, CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS);
+ ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
+ new BulkImportCacheCleaner(this), CLEANUP_BULK_LOADED_CACHE_MILLIS,
+ CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS));
HostAndPort managerHost;
while (!serverStopRequested) {
@@ -953,8 +960,9 @@ public class TabletServer extends AbstractServer {
Runnable replicationWorkThreadPoolResizer = () -> {
ThreadPools.resizePool(replicationThreadPool, aconf, Property.REPLICATION_WORKER_THREADS);
};
- context.getScheduledExecutor().scheduleWithFixedDelay(replicationWorkThreadPoolResizer, 10, 30,
- TimeUnit.SECONDS);
+ ScheduledFuture<?> future = context.getScheduledExecutor()
+ .scheduleWithFixedDelay(replicationWorkThreadPoolResizer, 10, 30, TimeUnit.SECONDS);
+ ThreadPools.watchNonCriticalScheduledTask(future);
}
public String getClientAddressString() {
@@ -1019,8 +1027,9 @@ public class TabletServer extends AbstractServer {
Runnable gcDebugTask = () -> gcLogger.logGCInfo(getConfiguration());
- context.getScheduledExecutor().scheduleWithFixedDelay(gcDebugTask, 0, TIME_BETWEEN_GC_CHECKS,
- TimeUnit.MILLISECONDS);
+ ScheduledFuture<?> future = context.getScheduledExecutor().scheduleWithFixedDelay(gcDebugTask,
+ 0, TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
+ ThreadPools.watchNonCriticalScheduledTask(future);
}
public TabletServerStatus getStats(Map<TableId,MapCounter<ScanRunState>> scanCounts) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 190b022..23ed47f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -137,8 +137,8 @@ public class TabletServerResourceManager {
*/
private void modifyThreadPoolSizesAtRuntime(IntSupplier maxThreads, String name,
final ThreadPoolExecutor tp) {
- context.getScheduledExecutor().scheduleWithFixedDelay(
- () -> ThreadPools.resizePool(tp, maxThreads, name), 1, 10, TimeUnit.SECONDS);
+ ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
+ () -> ThreadPools.resizePool(tp, maxThreads, name), 1, 10, TimeUnit.SECONDS));
}
private ThreadPoolExecutor createPriorityExecutor(ScanExecutorConfig sec,
@@ -378,8 +378,8 @@ public class TabletServerResourceManager {
// We can use the same map for both metadata and normal assignments since the keyspace (extent)
// is guaranteed to be unique. Schedule the task once, the task will reschedule itself.
- context.getScheduledExecutor().schedule(
- new AssignmentWatcher(acuConf, context, activeAssignments), 5000, TimeUnit.MILLISECONDS);
+ ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().schedule(
+ new AssignmentWatcher(acuConf, context, activeAssignments), 5000, TimeUnit.MILLISECONDS));
}
/**
@@ -439,7 +439,8 @@ public class TabletServerResourceManager {
if (log.isTraceEnabled()) {
log.trace("Rescheduling assignment watcher to run in {}ms", delay);
}
- context.getScheduledExecutor().schedule(this, delay, TimeUnit.MILLISECONDS);
+ ThreadPools.watchCriticalScheduledTask(
+ context.getScheduledExecutor().schedule(this, delay, TimeUnit.MILLISECONDS));
}
}
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index b884e61..92089a0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -263,7 +263,7 @@ public class TabletServerLogger {
return;
}
nextLogMaker = ThreadPools.createFixedThreadPool(1, "WALog creator", true);
- nextLogMaker.submit(new Runnable() {
+ nextLogMaker.execute(new Runnable() {
@Override
public void run() {
final ServerResources conf = tserver.getServerConfig();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
index e5d79ad..955d3a2 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntSupplier;
@@ -68,9 +69,9 @@ public class CompactionExecutorsMetrics implements MetricsProducer {
ThreadPools.createScheduledExecutorService(1, "compactionExecutorsMetricsPoller", false);
Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
long minimumRefreshDelay = TimeUnit.SECONDS.toMillis(5);
- scheduler.scheduleAtFixedRate(this::update, minimumRefreshDelay, minimumRefreshDelay,
- TimeUnit.MILLISECONDS);
-
+ ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(this::update, minimumRefreshDelay,
+ minimumRefreshDelay, TimeUnit.MILLISECONDS);
+ ThreadPools.watchNonCriticalScheduledTask(future);
}
public synchronized AutoCloseable addExecutor(CompactionExecutorId ceid,
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index ec10d73..ae43495 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -72,8 +73,8 @@ public class SessionManager {
Runnable r = () -> sweep(maxIdle, maxUpdateIdle);
- context.getScheduledExecutor().scheduleWithFixedDelay(r, 0, Math.max(maxIdle / 2, 1000),
- TimeUnit.MILLISECONDS);
+ ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(r,
+ 0, Math.max(maxIdle / 2, 1000), TimeUnit.MILLISECONDS));
}
public long createSession(Session session, boolean reserve) {
@@ -278,8 +279,9 @@ public class SessionManager {
}
};
- ThreadPools.createGeneralScheduledExecutorService(aconf).schedule(r, delay,
- TimeUnit.MILLISECONDS);
+ ScheduledFuture<?> future = ThreadPools.createGeneralScheduledExecutorService(aconf)
+ .schedule(r, delay, TimeUnit.MILLISECONDS);
+ ThreadPools.watchNonCriticalScheduledTask(future);
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java
index 48ee3c1..86a2b08 100644
--- a/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java
@@ -1247,7 +1247,7 @@ public class ConditionalWriterIT extends SharedMiniClusterBase {
ExecutorService tp = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
- tp.submit(new MutatorTask(tableName, client, rows, cw, failed));
+ tp.execute(new MutatorTask(tableName, client, rows, cw, failed));
}
tp.shutdown();
diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
index 87b5c9f..a6cbc6f 100644
--- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
+++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.test.compaction;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -45,7 +46,9 @@ public class ExternalDoNothingCompactor extends Compactor implements Iface {
@Override
protected void startCancelChecker(ScheduledThreadPoolExecutor schedExecutor,
long timeBetweenChecks) {
- schedExecutor.scheduleWithFixedDelay(() -> checkIfCanceled(), 0, 5000, TimeUnit.MILLISECONDS);
+ @SuppressWarnings("unused")
+ ScheduledFuture<?> future = schedExecutor.scheduleWithFixedDelay(() -> checkIfCanceled(), 0,
+ 5000, TimeUnit.MILLISECONDS);
}
@Override
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
index bebd7ec..0e74000 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
@@ -128,7 +128,7 @@ public class ScanIdIT extends AccumuloClusterHarness {
for (int scannerIndex = 0; scannerIndex < NUM_SCANNERS; scannerIndex++) {
ScannerThread st = new ScannerThread(client, scannerIndex, tableName, latch);
scanThreadsToClose.add(st);
- pool.submit(st);
+ pool.execute(st);
}
// wait for scanners to report a result.
diff --git a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java
index 4d17c4c..3905872 100644
--- a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java
@@ -410,7 +410,7 @@ public class SuspendedTabletsIT extends ConfigurableMacBase {
answer.scan(ctx, tableName, metaName);
return answer;
});
- THREAD_POOL.submit(tlsFuture);
+ THREAD_POOL.execute(tlsFuture);
return tlsFuture.get(5, SECONDS);
} catch (TimeoutException ex) {
log.debug("Retrieval timed out", ex);
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
index 4f93d3f..1358ab7 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
@@ -233,7 +233,7 @@ public class CollectTabletStats {
}
for (final KeyExtent ke : tabletsToTest) {
- threadPool.submit(() -> {
+ threadPool.execute(() -> {
try {
calcTabletStats(client, opts.tableName, opts.auths, ke, columns);
} catch (Exception e) {
@@ -318,7 +318,7 @@ public class CollectTabletStats {
CountDownLatch finishedSignal = new CountDownLatch(numThreads);
for (Test test : tests) {
- threadPool.submit(test);
+ threadPool.execute(test);
test.setSignals(startSignal, finishedSignal);
}