You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2022/03/01 16:13:43 UTC

[GitHub] [accumulo] keith-turner commented on a change in pull request #2524: Beginning to address unchecked Futures

keith-turner commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r816889328



##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -40,13 +44,92 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN",
+    justification = "Throwing Error for it to be caught by AccumuloUncaughtExceptionHandler")
 public class ThreadPools {
 
+  public static class ExecutionError extends Error {
+
+    private static final long serialVersionUID = 1L;
+
+    public ExecutionError(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
 
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static Runnable CRITICAL_TASK_CHECKER = new Runnable() {
+    @Override
+    public void run() {
+      while (true) {
+        Iterator<ScheduledFuture<?>> futures = CRITICAL_RUNNING_TASKS.iterator();
+        while (futures.hasNext()) {
+          ScheduledFuture<?> criticalTask = futures.next();
+          // Calling get() on a ScheduledFuture will block unless that scheduled task has
+          // completed. We call isDone() here instead. If the scheduled task is done then
+          // either it was a one-shot task, cancelled or an exception was thrown.
+          if (criticalTask.isDone()) {
+            // Now call get() to see if we get an exception.
+            try {
+              criticalTask.get();
+              // If we get here, then a scheduled task exited but did not throw an error
+              // or get canceled. This was likely a one-shot scheduled task (I don't think
+              // we can tell if it's one-shot or not, I think we have to assume that it is
+              // and that a recurring task would not normally be complete).
+              if (!CRITICAL_RUNNING_TASKS.remove(criticalTask)) {
+                LOG.warn("Unable to remove task from list of watched critical tasks");
+              }
+            } catch (ExecutionException ee) {
+              // An exception was thrown in the critical task. Throw the error here, which
+              // will then be caught by the AccumuloUncaughtExceptionHandler which will
+              // log the error and terminate the VM.
+              new ExecutionError("Critical scheduled task failed.", ee);
+            } catch (CancellationException ce) {
+              // do nothing here as it appears that the task was canceled. Remove it from
+              // the list of critical tasks
+              if (!CRITICAL_RUNNING_TASKS.remove(criticalTask)) {
+                LOG.warn("Unable to remove task from list of watched critical tasks");
+              }
+            } catch (InterruptedException ie) {
+              // current thread was interrupted waiting for get to return, which in theory,
+              // shouldn't happen since the task is done.
+              LOG.info("Interrupted while waiting to check on scheduled task.");
+              // Reset the interrupt state on this thread
+              Thread.interrupted();
+            }
+          }
+        }
+        try {
+          TimeUnit.MINUTES.sleep(1);
+        } catch (InterruptedException ie) {
+          // This thread was interrupted by something while sleeping. We don't want to exit
+          // this thread, so reset the interrupt state on this thread and keep going.
+          Thread.interrupted();
+        }
+      }
+    }
+  };
+
+  static {
+    SCHEDULED_FUTURE_CHECKER_POOL.execute(CRITICAL_TASK_CHECKER);
+  }
+
+  public static void watchCriticalScheduledTask(ScheduledFuture<?> future) {
+    CRITICAL_RUNNING_TASKS.add(future);
+  }
+

Review comment:
       If the future is not running and its expected to be running, would be nice to get any background exceptions if they exists.
   
   ```suggestion
   public static ensureRunning(ScheduledFuture<?> future, String message){
       if(future.isDone()){
            try{
                future.get();
            }catch(Exception e){
                 throw new IllegalStateException(message, e);
            }
            
            // it exited w/o exception, but we still expect it to be running so throw an exception.
            throw new IllegalStateException(message);
       }
   }
   ```

##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
##########
@@ -248,6 +251,10 @@ public synchronized void addMutation(TableId table, Mutation m)
       throw new IllegalStateException("Closed");
     if (m.size() == 0)
       throw new IllegalArgumentException("Can not add empty mutations");
+    if (this.latencyTimerFuture != null && this.latencyTimerFuture.isDone()) {
+      throw new RuntimeException(
+          "Latency timer thread has exited, cannot guarantee latency target");
+    }

Review comment:
       ```suggestion
       if (this.latencyTimerFuture != null) {
       ThreadPool.ensureRunning( this.latencyTimerFuture,
             "Latency timer thread has exited, cannot guarantee latency target");
       }
   ```

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
##########
@@ -68,8 +69,9 @@ public CompactionExecutorsMetrics() {
         ThreadPools.createScheduledExecutorService(1, "compactionExecutorsMetricsPoller", false);
     Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
     long minimumRefreshDelay = TimeUnit.SECONDS.toMillis(5);
-    scheduler.scheduleAtFixedRate(this::update, minimumRefreshDelay, minimumRefreshDelay,
-        TimeUnit.MILLISECONDS);
+    @SuppressWarnings("unused")
+    ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(this::update, minimumRefreshDelay,

Review comment:
       why ignore this one?

##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
##########
@@ -380,12 +382,16 @@ private void reschedule(SendTask task) {
         queue(mutations);
     };
 
-    threadPool.scheduleAtFixedRate(failureHandler, 250, 250, MILLISECONDS);
+    failureTaskFuture = threadPool.scheduleAtFixedRate(failureHandler, 250, 250, MILLISECONDS);
   }
 
   @Override
   public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
 
+    if (failureTaskFuture.isDone()) {
+      throw new RuntimeException("Background task that re-queues failed mutations has exited.");
+    }

Review comment:
       ```suggestion
      ThreadPool.ensureRunning(failureTaskFuture, "Background task that re-queues failed mutations has exited.");
   ```

##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -40,13 +44,92 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN",
+    justification = "Throwing Error for it to be caught by AccumuloUncaughtExceptionHandler")
 public class ThreadPools {
 
+  public static class ExecutionError extends Error {
+
+    private static final long serialVersionUID = 1L;
+
+    public ExecutionError(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
 
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static Runnable CRITICAL_TASK_CHECKER = new Runnable() {
+    @Override
+    public void run() {
+      while (true) {
+        Iterator<ScheduledFuture<?>> futures = CRITICAL_RUNNING_TASKS.iterator();
+        while (futures.hasNext()) {
+          ScheduledFuture<?> criticalTask = futures.next();
+          // Calling get() on a ScheduledFuture will block unless that scheduled task has
+          // completed. We call isDone() here instead. If the scheduled task is done then
+          // either it was a one-shot task, cancelled or an exception was thrown.
+          if (criticalTask.isDone()) {
+            // Now call get() to see if we get an exception.
+            try {
+              criticalTask.get();
+              // If we get here, then a scheduled task exited but did not throw an error
+              // or get canceled. This was likely a one-shot scheduled task (I don't think
+              // we can tell if it's one-shot or not, I think we have to assume that it is
+              // and that a recurring task would not normally be complete).

Review comment:
       We could assume that one shot task should never be added to the list, so that even if it exits without an exception its still considered an error that will leave the process unhealthy.  Could be an indication of a bug in the sched task, so maybe throw an error or log an error.

##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
##########
@@ -576,14 +583,18 @@ private synchronized void addFailedMutations(MutationSet failedMutations) {
     private MutationSet recentFailures = null;
     private long initTime;
     private final Runnable task;
+    private final ScheduledFuture<?> future;
 
     FailedMutations() {
       task =
           Threads.createNamedRunnable("failed mutationBatchWriterLatencyTimers handler", this::run);
-      executor.scheduleWithFixedDelay(task, 0, 500, MILLISECONDS);
+      future = executor.scheduleWithFixedDelay(task, 0, 500, MILLISECONDS);
     }
 
     private MutationSet init() {
+      if (future.isDone()) {
+        throw new RuntimeException("Background task that re-queues failed mutations has exited.");
+      }

Review comment:
       ```suggestion
        ThreadPool.ensureRunning(future, "Background task that re-queues failed mutations has exited.");
   ```

##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -40,13 +44,92 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN",
+    justification = "Throwing Error for it to be caught by AccumuloUncaughtExceptionHandler")
 public class ThreadPools {
 
+  public static class ExecutionError extends Error {
+
+    private static final long serialVersionUID = 1L;
+
+    public ExecutionError(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
 
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();

Review comment:
       This suggestion goes w/ another comment.  We could have another list of non_critical ones to watch.  For the non-critical we would throw a RumtimeException instead of an Error.
   
   ```suggestion
     private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
         new ConcurrentLinkedQueue<>();
       private static final ConcurrentLinkedQueue<ScheduledFuture<?>> NON_CRITICAL_RUNNING_TASKS =
         new ConcurrentLinkedQueue<>();    
   ```

##########
File path: server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
##########
@@ -155,7 +156,8 @@ public void registerMetrics(final MeterRegistry registry) {
         ThreadPools.createScheduledExecutorService(1, "fateMetricsPoller", false);
     Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
 
-    scheduler.scheduleAtFixedRate(() -> {
+    @SuppressWarnings("unused")
+    ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {

Review comment:
       why ignore failures w/ this one?  if its because its not critical, then maybe we need another ThreadPools method to watch no-critical scheduled task so that we can still get important debug info if they die.  The one for non-critical could do almost the same as the critical one but not throw an execution error, just throw a runtime exception instead.  Their could be two list (critical and non-critical) both monitored by the same thread.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org