You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2022/02/24 15:11:41 UTC

[accumulo] 01/01: Beginning to address unchecked Futures

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch deal-with-unchecked-futures
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 12abc74a961e6ab91d9b4da24a3faa0f55094301
Author: Dave Marion <dl...@apache.org>
AuthorDate: Thu Feb 24 15:10:13 2022 +0000

    Beginning to address unchecked Futures
---
 .../accumulo/core/clientImpl/ConditionalWriterImpl.java      | 12 +++++++++++-
 .../accumulo/core/clientImpl/TabletServerBatchWriter.java    |  2 +-
 .../core/util/ratelimit/SharedRateLimiterFactory.java        |  7 ++++++-
 core/src/main/java/org/apache/accumulo/fate/Fate.java        | 11 ++++++++++-
 pom.xml                                                      |  1 +
 5 files changed, 29 insertions(+), 4 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
index 182d59e..f1b5415 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
@@ -36,6 +36,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -111,6 +112,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
   private Map<String,ServerQueue> serverQueues;
   private DelayQueue<QCMutation> failedMutations = new DelayQueue<>();
   private ScheduledThreadPoolExecutor threadPool;
+  private final ScheduledFuture<?> failureTaskFuture;
 
   private class RQIterator implements Iterator<Result> {
 
@@ -378,12 +380,20 @@ class ConditionalWriterImpl implements ConditionalWriter {
         queue(mutations);
     };
 
-    threadPool.scheduleAtFixedRate(failureHandler, 250, 250, TimeUnit.MILLISECONDS);
+    failureTaskFuture =
+        threadPool.scheduleAtFixedRate(failureHandler, 250, 250, TimeUnit.MILLISECONDS);
   }
 
   @Override
   public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
 
+    synchronized (failureTaskFuture) {
+      if (failureTaskFuture.isDone()) {
+        // TabletServerBatchWriter throws a MutationsRejectedException.
+        throw new RuntimeException("Background task that re-queues failed mutations has failed.");
+      }
+    }
+
     BlockingQueue<Result> resultQueue = new LinkedBlockingQueue<>();
 
     List<QCMutation> mutationList = new ArrayList<>();
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
index fe25407..dfe06d5 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
@@ -775,7 +775,7 @@ public class TabletServerBatchWriter implements AutoCloseable {
 
       for (String server : servers)
         if (!queued.contains(server)) {
-          sendThreadPool.submit(new SendTask(server));
+          sendThreadPool.execute(new SendTask(server));
           queued.add(server);
         }
     }
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
index c5c6890..30435e5 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
@@ -22,6 +22,7 @@ import java.lang.ref.WeakReference;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.WeakHashMap;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -41,6 +42,7 @@ public class SharedRateLimiterFactory {
   private static final long REPORT_RATE = 60000;
   private static final long UPDATE_RATE = 1000;
   private static SharedRateLimiterFactory instance = null;
+  private static ScheduledFuture<?> updateTaskFuture;
   private final Logger log = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
   private final WeakHashMap<String,WeakReference<SharedRateLimiter>> activeLimiters =
       new WeakHashMap<>();
@@ -53,7 +55,7 @@ public class SharedRateLimiterFactory {
       instance = new SharedRateLimiterFactory();
 
       ScheduledThreadPoolExecutor svc = ThreadPools.createGeneralScheduledExecutorService(conf);
-      svc.scheduleWithFixedDelay(Threads
+      updateTaskFuture = svc.scheduleWithFixedDelay(Threads
           .createNamedRunnable("SharedRateLimiterFactory update polling", instance::updateAll),
           UPDATE_RATE, UPDATE_RATE, TimeUnit.MILLISECONDS);
 
@@ -89,6 +91,9 @@ public class SharedRateLimiterFactory {
    */
   public RateLimiter create(String name, RateProvider rateProvider) {
     synchronized (activeLimiters) {
+      if (updateTaskFuture.isDone()) {
+        log.warn("SharedRateLimiterFactory update task has failed.");
+      }
       var limiterRef = activeLimiters.get(name);
       var limiter = limiterRef == null ? null : limiterRef.get();
       if (limiter == null) {
diff --git a/core/src/main/java/org/apache/accumulo/fate/Fate.java b/core/src/main/java/org/apache/accumulo/fate/Fate.java
index a2559ea..3b7fc23 100644
--- a/core/src/main/java/org/apache/accumulo/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/fate/Fate.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.EnumSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -54,6 +55,7 @@ public class Fate<T> {
   private final TStore<T> store;
   private final T environment;
   private ScheduledThreadPoolExecutor fatePoolWatcher;
+  private ScheduledFuture<?> fatePoolWatcherFuture;
   private ExecutorService executor;
 
   private static final EnumSet<TStatus> FINISHED_STATES =
@@ -66,6 +68,9 @@ public class Fate<T> {
     @Override
     public void run() {
       while (keepRunning.get()) {
+        if (isFatePoolResizerFailed()) {
+          log.warn("FaTE thread pool resizer scheduled task has failed.");
+        }
         long deferTime = 0;
         Long tid = null;
         try {
@@ -235,7 +240,7 @@ public class Fate<T> {
     final ThreadPoolExecutor pool =
         ThreadPools.createExecutorService(conf, Property.MANAGER_FATE_THREADPOOL_SIZE, true);
     fatePoolWatcher = ThreadPools.createGeneralScheduledExecutorService(conf);
-    fatePoolWatcher.schedule(() -> {
+    fatePoolWatcherFuture = fatePoolWatcher.schedule(() -> {
       // resize the pool if the property changed
       ThreadPools.resizePool(pool, conf, Property.MANAGER_FATE_THREADPOOL_SIZE);
       // If the pool grew, then ensure that there is a TransactionRunner for each thread
@@ -266,6 +271,10 @@ public class Fate<T> {
     return store.create();
   }
 
+  private synchronized boolean isFatePoolResizerFailed() {
+    return fatePoolWatcherFuture.isDone();
+  }
+
   // start work in the transaction.. it is safe to call this
   // multiple times for a transaction... but it will only seed once
   public void seedTransaction(long tid, Repo<T> repo, boolean autoCleanUp, String goalMessage) {
diff --git a/pom.xml b/pom.xml
index 2018882..acc3e7a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1663,6 +1663,7 @@
                   -Xep:CheckReturnValue:OFF \
                   -Xep:MustBeClosedChecker:OFF \
                   -Xep:ReturnValueIgnored:OFF \
+                  -Xep:FutureReturnValueIgnored:ERROR \
                   -Xep:UnicodeInCode:OFF \
                   <!-- error/warning patterns to specifically check -->
                   -Xep:ExpectedExceptionChecker \