You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2022/02/24 15:11:41 UTC
[accumulo] 01/01: Beginning to address unchecked Futures
This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch deal-with-unchecked-futures
in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 12abc74a961e6ab91d9b4da24a3faa0f55094301
Author: Dave Marion <dl...@apache.org>
AuthorDate: Thu Feb 24 15:10:13 2022 +0000
Beginning to address unchecked Futures
---
.../accumulo/core/clientImpl/ConditionalWriterImpl.java | 12 +++++++++++-
.../accumulo/core/clientImpl/TabletServerBatchWriter.java | 2 +-
.../core/util/ratelimit/SharedRateLimiterFactory.java | 7 ++++++-
core/src/main/java/org/apache/accumulo/fate/Fate.java | 11 ++++++++++-
pom.xml | 1 +
5 files changed, 29 insertions(+), 4 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
index 182d59e..f1b5415 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
@@ -36,6 +36,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -111,6 +112,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
private Map<String,ServerQueue> serverQueues;
private DelayQueue<QCMutation> failedMutations = new DelayQueue<>();
private ScheduledThreadPoolExecutor threadPool;
+ private final ScheduledFuture<?> failureTaskFuture;
private class RQIterator implements Iterator<Result> {
@@ -378,12 +380,20 @@ class ConditionalWriterImpl implements ConditionalWriter {
queue(mutations);
};
- threadPool.scheduleAtFixedRate(failureHandler, 250, 250, TimeUnit.MILLISECONDS);
+ failureTaskFuture =
+ threadPool.scheduleAtFixedRate(failureHandler, 250, 250, TimeUnit.MILLISECONDS);
}
@Override
public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
+ synchronized (failureTaskFuture) {
+ if (failureTaskFuture.isDone()) {
+ // TabletServerBatchWriter throws a MutationsRejectedException.
+ throw new RuntimeException("Background task that re-queues failed mutations has failed.");
+ }
+ }
+
BlockingQueue<Result> resultQueue = new LinkedBlockingQueue<>();
List<QCMutation> mutationList = new ArrayList<>();
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
index fe25407..dfe06d5 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
@@ -775,7 +775,7 @@ public class TabletServerBatchWriter implements AutoCloseable {
for (String server : servers)
if (!queued.contains(server)) {
- sendThreadPool.submit(new SendTask(server));
+ sendThreadPool.execute(new SendTask(server));
queued.add(server);
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
index c5c6890..30435e5 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
@@ -22,6 +22,7 @@ import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.Map;
import java.util.WeakHashMap;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -41,6 +42,7 @@ public class SharedRateLimiterFactory {
private static final long REPORT_RATE = 60000;
private static final long UPDATE_RATE = 1000;
private static SharedRateLimiterFactory instance = null;
+ private static ScheduledFuture<?> updateTaskFuture;
private final Logger log = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
private final WeakHashMap<String,WeakReference<SharedRateLimiter>> activeLimiters =
new WeakHashMap<>();
@@ -53,7 +55,7 @@ public class SharedRateLimiterFactory {
instance = new SharedRateLimiterFactory();
ScheduledThreadPoolExecutor svc = ThreadPools.createGeneralScheduledExecutorService(conf);
- svc.scheduleWithFixedDelay(Threads
+ updateTaskFuture = svc.scheduleWithFixedDelay(Threads
.createNamedRunnable("SharedRateLimiterFactory update polling", instance::updateAll),
UPDATE_RATE, UPDATE_RATE, TimeUnit.MILLISECONDS);
@@ -89,6 +91,9 @@ public class SharedRateLimiterFactory {
*/
public RateLimiter create(String name, RateProvider rateProvider) {
synchronized (activeLimiters) {
+ if (updateTaskFuture.isDone()) {
+ log.warn("SharedRateLimiterFactory update task has failed.");
+ }
var limiterRef = activeLimiters.get(name);
var limiter = limiterRef == null ? null : limiterRef.get();
if (limiter == null) {
diff --git a/core/src/main/java/org/apache/accumulo/fate/Fate.java b/core/src/main/java/org/apache/accumulo/fate/Fate.java
index a2559ea..3b7fc23 100644
--- a/core/src/main/java/org/apache/accumulo/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/fate/Fate.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.EnumSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -54,6 +55,7 @@ public class Fate<T> {
private final TStore<T> store;
private final T environment;
private ScheduledThreadPoolExecutor fatePoolWatcher;
+ private ScheduledFuture<?> fatePoolWatcherFuture;
private ExecutorService executor;
private static final EnumSet<TStatus> FINISHED_STATES =
@@ -66,6 +68,9 @@ public class Fate<T> {
@Override
public void run() {
while (keepRunning.get()) {
+ if (isFatePoolResizerFailed()) {
+ log.warn("FaTE thread pool resizer scheduled task has failed.");
+ }
long deferTime = 0;
Long tid = null;
try {
@@ -235,7 +240,7 @@ public class Fate<T> {
final ThreadPoolExecutor pool =
ThreadPools.createExecutorService(conf, Property.MANAGER_FATE_THREADPOOL_SIZE, true);
fatePoolWatcher = ThreadPools.createGeneralScheduledExecutorService(conf);
- fatePoolWatcher.schedule(() -> {
+ fatePoolWatcherFuture = fatePoolWatcher.schedule(() -> {
// resize the pool if the property changed
ThreadPools.resizePool(pool, conf, Property.MANAGER_FATE_THREADPOOL_SIZE);
// If the pool grew, then ensure that there is a TransactionRunner for each thread
@@ -266,6 +271,10 @@ public class Fate<T> {
return store.create();
}
+ private synchronized boolean isFatePoolResizerFailed() {
+ return fatePoolWatcherFuture.isDone();
+ }
+
// start work in the transaction.. it is safe to call this
// multiple times for a transaction... but it will only seed once
public void seedTransaction(long tid, Repo<T> repo, boolean autoCleanUp, String goalMessage) {
diff --git a/pom.xml b/pom.xml
index 2018882..acc3e7a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1663,6 +1663,7 @@
-Xep:CheckReturnValue:OFF \
-Xep:MustBeClosedChecker:OFF \
-Xep:ReturnValueIgnored:OFF \
+ -Xep:FutureReturnValueIgnored:ERROR \
-Xep:UnicodeInCode:OFF \
<!-- error/warning patterns to specifically check -->
-Xep:ExpectedExceptionChecker \