You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2022/02/24 15:41:26 UTC

[GitHub] [accumulo] dlmarion opened a new pull request #2524: Beginning to address unchecked Futures

dlmarion opened a new pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524


   Related to #2520 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r814960271



##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -47,6 +51,70 @@
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static Runnable CRITICAL_TASK_CHECKER = new Runnable() {
+    @Override
+    public void run() {
+      while (true) {
+        Iterator<ScheduledFuture<?>> futures = CRITICAL_RUNNING_TASKS.iterator();
+        while (futures.hasNext()) {
+          ScheduledFuture<?> criticalTask = futures.next();
+          // Calling get() on a ScheduledFuture will block unless that scheduled task has
+          // completed. We call isDone() here instead. If the scheduled task is done then
+          // either it was a one-shot task, cancelled or an exception was thrown.
+          if (criticalTask.isDone()) {
+            // Now call get() to see if we get an exception.
+            try {
+              criticalTask.get();
+              // If we get here, then a scheduled task exited but did not throw an error
+              // or get canceled. This was likely a one-shot scheduled task.
+              if (!CRITICAL_RUNNING_TASKS.remove(criticalTask)) {
+                LOG.warn("Unable to remove task from list of watched critical tasks");
+              }
+            } catch (ExecutionException ee) {
+              // An exception was thrown in the critical task. Throw the error here, which
+              // will then be caught by the AccumuloUncaughtExceptionHandler which will
+              // log the error and terminate the VM.
+              throw new RuntimeException("Critical scheduled task failed.", ee.getCause());

Review comment:
       Calling ee.getCause() drops the stack trace associated with the ExecutionException which may contain important info for debugging problems.   The uncaughtexceptionhandler only halts the VM when its an error.  Since this is not an error then them VM will not be halted and this thread that checks future will be dead. 
   
   ```suggestion
                 throw new RuntimeException("Critical scheduled task failed.", ee);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r817989092



##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -40,13 +44,92 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN",
+    justification = "Throwing Error for it to be caught by AccumuloUncaughtExceptionHandler")
 public class ThreadPools {
 
+  public static class ExecutionError extends Error {
+
+    private static final long serialVersionUID = 1L;
+
+    public ExecutionError(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
 
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static Runnable CRITICAL_TASK_CHECKER = new Runnable() {
+    @Override
+    public void run() {
+      while (true) {
+        Iterator<ScheduledFuture<?>> futures = CRITICAL_RUNNING_TASKS.iterator();
+        while (futures.hasNext()) {
+          ScheduledFuture<?> criticalTask = futures.next();
+          // Calling get() on a ScheduledFuture will block unless that scheduled task has
+          // completed. We call isDone() here instead. If the scheduled task is done then
+          // either it was a one-shot task, cancelled or an exception was thrown.
+          if (criticalTask.isDone()) {
+            // Now call get() to see if we get an exception.
+            try {
+              criticalTask.get();
+              // If we get here, then a scheduled task exited but did not throw an error
+              // or get canceled. This was likely a one-shot scheduled task (I don't think
+              // we can tell if it's one-shot or not, I think we have to assume that it is
+              // and that a recurring task would not normally be complete).

Review comment:
       Thinking more about it, I think what I was worried about can not happen.  A recurring scheduled task will never "exit", those will probably only report done if there is an error or its cancelled.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r818019250



##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -40,13 +44,129 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN",
+    justification = "Throwing Error for it to be caught by AccumuloUncaughtExceptionHandler")
 public class ThreadPools {
 
+  public static class ExecutionError extends Error {
+
+    private static final long serialVersionUID = 1L;
+
+    public ExecutionError(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
 
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> NON_CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static Runnable TASK_CHECKER = new Runnable() {
+    @Override
+    public void run() {
+      while (true) {
+        Iterator<ScheduledFuture<?>> criticalTasks = CRITICAL_RUNNING_TASKS.iterator();
+        while (criticalTasks.hasNext()) {
+          checkTaskFailed(criticalTasks.next(), true);
+        }
+        Iterator<ScheduledFuture<?>> nonCriticalTasks = NON_CRITICAL_RUNNING_TASKS.iterator();
+        while (nonCriticalTasks.hasNext()) {
+          checkTaskFailed(nonCriticalTasks.next(), false);
+        }
+        try {
+          TimeUnit.MINUTES.sleep(1);
+        } catch (InterruptedException ie) {
+          // This thread was interrupted by something while sleeping. We don't want to exit
+          // this thread, so reset the interrupt state on this thread and keep going.
+          Thread.interrupted();
+        }
+      }
+    }
+  };
+
+  private static void checkTaskFailed(Future<?> future, boolean critical) {
+    // Calling get() on a ScheduledFuture will block unless that scheduled task has
+    // completed. We call isDone() here instead. If the scheduled task is done then
+    // either it was a one-shot task, cancelled or an exception was thrown.
+    if (future.isDone()) {
+      // Now call get() to see if we get an exception.
+      try {
+        future.get();
+        // If we get here, then a scheduled task exited but did not throw an error
+        // or get canceled. This was likely a one-shot scheduled task (I don't think
+        // we can tell if it's one-shot or not, I think we have to assume that it is
+        // and that a recurring task would not normally be complete).
+        boolean removed = critical ? CRITICAL_RUNNING_TASKS.remove(future)
+            : NON_CRITICAL_RUNNING_TASKS.remove(future);
+        if (!removed) {
+          LOG.warn("Unable to remove task from list of watched tasks");
+        }
+      } catch (ExecutionException ee) {
+        // An exception was thrown in the critical task. Throw the error here, which
+        // will then be caught by the AccumuloUncaughtExceptionHandler which will
+        // log the error and terminate the VM.
+        if (critical) {
+          throw new ExecutionError("Critical scheduled task failed.", ee);
+        } else {
+          LOG.error("Non-critical task failed", ee);

Review comment:
       Addressed in 9b53ceb




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r817678247



##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
##########
@@ -248,6 +251,10 @@ public synchronized void addMutation(TableId table, Mutation m)
       throw new IllegalStateException("Closed");
     if (m.size() == 0)
       throw new IllegalArgumentException("Can not add empty mutations");
+    if (this.latencyTimerFuture != null && this.latencyTimerFuture.isDone()) {
+      throw new RuntimeException(
+          "Latency timer thread has exited, cannot guarantee latency target");
+    }

Review comment:
       Addressed in ab25541




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r815131595



##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -47,6 +51,70 @@
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static Runnable CRITICAL_TASK_CHECKER = new Runnable() {
+    @Override
+    public void run() {
+      while (true) {
+        Iterator<ScheduledFuture<?>> futures = CRITICAL_RUNNING_TASKS.iterator();
+        while (futures.hasNext()) {
+          ScheduledFuture<?> criticalTask = futures.next();
+          // Calling get() on a ScheduledFuture will block unless that scheduled task has
+          // completed. We call isDone() here instead. If the scheduled task is done then
+          // either it was a one-shot task, cancelled or an exception was thrown.
+          if (criticalTask.isDone()) {
+            // Now call get() to see if we get an exception.
+            try {
+              criticalTask.get();
+              // If we get here, then a scheduled task exited but did not throw an error
+              // or get canceled. This was likely a one-shot scheduled task.
+              if (!CRITICAL_RUNNING_TASKS.remove(criticalTask)) {
+                LOG.warn("Unable to remove task from list of watched critical tasks");
+              }
+            } catch (ExecutionException ee) {
+              // An exception was thrown in the critical task. Throw the error here, which
+              // will then be caught by the AccumuloUncaughtExceptionHandler which will
+              // log the error and terminate the VM.
+              throw new RuntimeException("Critical scheduled task failed.", ee.getCause());

Review comment:
       This code looks good.  I was just thinking about the case where we have something like `new RuntimeException(<some error>)` happenning somewhere else in the code and making it to the UEH, would be nice if it unraveled it and found the error.   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r814979489



##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -47,6 +51,70 @@
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static Runnable CRITICAL_TASK_CHECKER = new Runnable() {
+    @Override
+    public void run() {
+      while (true) {
+        Iterator<ScheduledFuture<?>> futures = CRITICAL_RUNNING_TASKS.iterator();
+        while (futures.hasNext()) {
+          ScheduledFuture<?> criticalTask = futures.next();
+          // Calling get() on a ScheduledFuture will block unless that scheduled task has
+          // completed. We call isDone() here instead. If the scheduled task is done then
+          // either it was a one-shot task, cancelled or an exception was thrown.
+          if (criticalTask.isDone()) {
+            // Now call get() to see if we get an exception.
+            try {
+              criticalTask.get();
+              // If we get here, then a scheduled task exited but did not throw an error
+              // or get canceled. This was likely a one-shot scheduled task.
+              if (!CRITICAL_RUNNING_TASKS.remove(criticalTask)) {
+                LOG.warn("Unable to remove task from list of watched critical tasks");
+              }
+            } catch (ExecutionException ee) {
+              // An exception was thrown in the critical task. Throw the error here, which
+              // will then be caught by the AccumuloUncaughtExceptionHandler which will
+              // log the error and terminate the VM.
+              throw new RuntimeException("Critical scheduled task failed.", ee.getCause());

Review comment:
       Thinking of throwing com.google.common.util.concurrent.ExecutionError here...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r817678548



##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -40,13 +44,92 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN",
+    justification = "Throwing Error for it to be caught by AccumuloUncaughtExceptionHandler")
 public class ThreadPools {
 
+  public static class ExecutionError extends Error {
+
+    private static final long serialVersionUID = 1L;
+
+    public ExecutionError(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
 
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static Runnable CRITICAL_TASK_CHECKER = new Runnable() {
+    @Override
+    public void run() {
+      while (true) {
+        Iterator<ScheduledFuture<?>> futures = CRITICAL_RUNNING_TASKS.iterator();
+        while (futures.hasNext()) {
+          ScheduledFuture<?> criticalTask = futures.next();
+          // Calling get() on a ScheduledFuture will block unless that scheduled task has
+          // completed. We call isDone() here instead. If the scheduled task is done then
+          // either it was a one-shot task, cancelled or an exception was thrown.
+          if (criticalTask.isDone()) {
+            // Now call get() to see if we get an exception.
+            try {
+              criticalTask.get();
+              // If we get here, then a scheduled task exited but did not throw an error
+              // or get canceled. This was likely a one-shot scheduled task (I don't think
+              // we can tell if it's one-shot or not, I think we have to assume that it is
+              // and that a recurring task would not normally be complete).
+              if (!CRITICAL_RUNNING_TASKS.remove(criticalTask)) {
+                LOG.warn("Unable to remove task from list of watched critical tasks");
+              }
+            } catch (ExecutionException ee) {
+              // An exception was thrown in the critical task. Throw the error here, which
+              // will then be caught by the AccumuloUncaughtExceptionHandler which will
+              // log the error and terminate the VM.
+              new ExecutionError("Critical scheduled task failed.", ee);
+            } catch (CancellationException ce) {
+              // do nothing here as it appears that the task was canceled. Remove it from
+              // the list of critical tasks
+              if (!CRITICAL_RUNNING_TASKS.remove(criticalTask)) {
+                LOG.warn("Unable to remove task from list of watched critical tasks");
+              }
+            } catch (InterruptedException ie) {
+              // current thread was interrupted waiting for get to return, which in theory,
+              // shouldn't happen since the task is done.
+              LOG.info("Interrupted while waiting to check on scheduled task.");
+              // Reset the interrupt state on this thread
+              Thread.interrupted();
+            }
+          }
+        }
+        try {
+          TimeUnit.MINUTES.sleep(1);
+        } catch (InterruptedException ie) {
+          // This thread was interrupted by something while sleeping. We don't want to exit
+          // this thread, so reset the interrupt state on this thread and keep going.
+          Thread.interrupted();
+        }
+      }
+    }
+  };
+
+  static {
+    SCHEDULED_FUTURE_CHECKER_POOL.execute(CRITICAL_TASK_CHECKER);
+  }
+
+  public static void watchCriticalScheduledTask(ScheduledFuture<?> future) {
+    CRITICAL_RUNNING_TASKS.add(future);
+  }
+

Review comment:
       Addressed in ab25541




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r816889328



##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -40,13 +44,92 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN",
+    justification = "Throwing Error for it to be caught by AccumuloUncaughtExceptionHandler")
 public class ThreadPools {
 
+  public static class ExecutionError extends Error {
+
+    private static final long serialVersionUID = 1L;
+
+    public ExecutionError(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
 
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static Runnable CRITICAL_TASK_CHECKER = new Runnable() {
+    @Override
+    public void run() {
+      while (true) {
+        Iterator<ScheduledFuture<?>> futures = CRITICAL_RUNNING_TASKS.iterator();
+        while (futures.hasNext()) {
+          ScheduledFuture<?> criticalTask = futures.next();
+          // Calling get() on a ScheduledFuture will block unless that scheduled task has
+          // completed. We call isDone() here instead. If the scheduled task is done then
+          // either it was a one-shot task, cancelled or an exception was thrown.
+          if (criticalTask.isDone()) {
+            // Now call get() to see if we get an exception.
+            try {
+              criticalTask.get();
+              // If we get here, then a scheduled task exited but did not throw an error
+              // or get canceled. This was likely a one-shot scheduled task (I don't think
+              // we can tell if it's one-shot or not, I think we have to assume that it is
+              // and that a recurring task would not normally be complete).
+              if (!CRITICAL_RUNNING_TASKS.remove(criticalTask)) {
+                LOG.warn("Unable to remove task from list of watched critical tasks");
+              }
+            } catch (ExecutionException ee) {
+              // An exception was thrown in the critical task. Throw the error here, which
+              // will then be caught by the AccumuloUncaughtExceptionHandler which will
+              // log the error and terminate the VM.
+              new ExecutionError("Critical scheduled task failed.", ee);
+            } catch (CancellationException ce) {
+              // do nothing here as it appears that the task was canceled. Remove it from
+              // the list of critical tasks
+              if (!CRITICAL_RUNNING_TASKS.remove(criticalTask)) {
+                LOG.warn("Unable to remove task from list of watched critical tasks");
+              }
+            } catch (InterruptedException ie) {
+              // current thread was interrupted waiting for get to return, which in theory,
+              // shouldn't happen since the task is done.
+              LOG.info("Interrupted while waiting to check on scheduled task.");
+              // Reset the interrupt state on this thread
+              Thread.interrupted();
+            }
+          }
+        }
+        try {
+          TimeUnit.MINUTES.sleep(1);
+        } catch (InterruptedException ie) {
+          // This thread was interrupted by something while sleeping. We don't want to exit
+          // this thread, so reset the interrupt state on this thread and keep going.
+          Thread.interrupted();
+        }
+      }
+    }
+  };
+
+  static {
+    SCHEDULED_FUTURE_CHECKER_POOL.execute(CRITICAL_TASK_CHECKER);
+  }
+
+  public static void watchCriticalScheduledTask(ScheduledFuture<?> future) {
+    CRITICAL_RUNNING_TASKS.add(future);
+  }
+

Review comment:
       If the future is not running and its expected to be running, would be nice to get any background exceptions if they exists.
   
   ```suggestion
   public static ensureRunning(ScheduledFuture<?> future, String message){
       if(future.isDone()){
            try{
                future.get();
            }catch(Exception e){
                 throw new IllegalStateException(message, e);
            }
            
            // it exited w/o exception, but we still expect it to be running so throw an exception.
            throw new IllegalStateException(message);
       }
   }
   ```

##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
##########
@@ -248,6 +251,10 @@ public synchronized void addMutation(TableId table, Mutation m)
       throw new IllegalStateException("Closed");
     if (m.size() == 0)
       throw new IllegalArgumentException("Can not add empty mutations");
+    if (this.latencyTimerFuture != null && this.latencyTimerFuture.isDone()) {
+      throw new RuntimeException(
+          "Latency timer thread has exited, cannot guarantee latency target");
+    }

Review comment:
       ```suggestion
       if (this.latencyTimerFuture != null) {
       ThreadPool.ensureRunning( this.latencyTimerFuture,
             "Latency timer thread has exited, cannot guarantee latency target");
       }
   ```

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
##########
@@ -68,8 +69,9 @@ public CompactionExecutorsMetrics() {
         ThreadPools.createScheduledExecutorService(1, "compactionExecutorsMetricsPoller", false);
     Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
     long minimumRefreshDelay = TimeUnit.SECONDS.toMillis(5);
-    scheduler.scheduleAtFixedRate(this::update, minimumRefreshDelay, minimumRefreshDelay,
-        TimeUnit.MILLISECONDS);
+    @SuppressWarnings("unused")
+    ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(this::update, minimumRefreshDelay,

Review comment:
       why ignore this one?

##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
##########
@@ -380,12 +382,16 @@ private void reschedule(SendTask task) {
         queue(mutations);
     };
 
-    threadPool.scheduleAtFixedRate(failureHandler, 250, 250, MILLISECONDS);
+    failureTaskFuture = threadPool.scheduleAtFixedRate(failureHandler, 250, 250, MILLISECONDS);
   }
 
   @Override
   public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
 
+    if (failureTaskFuture.isDone()) {
+      throw new RuntimeException("Background task that re-queues failed mutations has exited.");
+    }

Review comment:
       ```suggestion
      ThreadPool.ensureRunning(failureTaskFuture, "Background task that re-queues failed mutations has exited.");
   ```

##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -40,13 +44,92 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN",
+    justification = "Throwing Error for it to be caught by AccumuloUncaughtExceptionHandler")
 public class ThreadPools {
 
+  public static class ExecutionError extends Error {
+
+    private static final long serialVersionUID = 1L;
+
+    public ExecutionError(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
 
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static Runnable CRITICAL_TASK_CHECKER = new Runnable() {
+    @Override
+    public void run() {
+      while (true) {
+        Iterator<ScheduledFuture<?>> futures = CRITICAL_RUNNING_TASKS.iterator();
+        while (futures.hasNext()) {
+          ScheduledFuture<?> criticalTask = futures.next();
+          // Calling get() on a ScheduledFuture will block unless that scheduled task has
+          // completed. We call isDone() here instead. If the scheduled task is done then
+          // either it was a one-shot task, cancelled or an exception was thrown.
+          if (criticalTask.isDone()) {
+            // Now call get() to see if we get an exception.
+            try {
+              criticalTask.get();
+              // If we get here, then a scheduled task exited but did not throw an error
+              // or get canceled. This was likely a one-shot scheduled task (I don't think
+              // we can tell if it's one-shot or not, I think we have to assume that it is
+              // and that a recurring task would not normally be complete).

Review comment:
       We could assume that one shot task should never be added to the list, so that even if it exits without an exception its still considered an error that will leave the process unhealthy.  Could be an indication of a bug in the sched task, so maybe throw an error or log an error.

##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
##########
@@ -576,14 +583,18 @@ private synchronized void addFailedMutations(MutationSet failedMutations) {
     private MutationSet recentFailures = null;
     private long initTime;
     private final Runnable task;
+    private final ScheduledFuture<?> future;
 
     FailedMutations() {
       task =
           Threads.createNamedRunnable("failed mutationBatchWriterLatencyTimers handler", this::run);
-      executor.scheduleWithFixedDelay(task, 0, 500, MILLISECONDS);
+      future = executor.scheduleWithFixedDelay(task, 0, 500, MILLISECONDS);
     }
 
     private MutationSet init() {
+      if (future.isDone()) {
+        throw new RuntimeException("Background task that re-queues failed mutations has exited.");
+      }

Review comment:
       ```suggestion
        ThreadPool.ensureRunning(future, "Background task that re-queues failed mutations has exited.");
   ```

##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -40,13 +44,92 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN",
+    justification = "Throwing Error for it to be caught by AccumuloUncaughtExceptionHandler")
 public class ThreadPools {
 
+  public static class ExecutionError extends Error {
+
+    private static final long serialVersionUID = 1L;
+
+    public ExecutionError(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
 
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();

Review comment:
       This suggestion goes w/ another comment.  We could have another list of non_critical ones to watch.  For the non-critical we would throw a RumtimeException instead of an Error.
   
   ```suggestion
     private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
         new ConcurrentLinkedQueue<>();
       private static final ConcurrentLinkedQueue<ScheduledFuture<?>> NON_CRITICAL_RUNNING_TASKS =
         new ConcurrentLinkedQueue<>();    
   ```

##########
File path: server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
##########
@@ -155,7 +156,8 @@ public void registerMetrics(final MeterRegistry registry) {
         ThreadPools.createScheduledExecutorService(1, "fateMetricsPoller", false);
     Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
 
-    scheduler.scheduleAtFixedRate(() -> {
+    @SuppressWarnings("unused")
+    ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {

Review comment:
       why ignore failures w/ this one?  if its because its not critical, then maybe we need another ThreadPools method to watch no-critical scheduled task so that we can still get important debug info if they die.  The one for non-critical could do almost the same as the critical one but not throw an execution error, just throw a runtime exception instead.  Their could be two list (critical and non-critical) both monitored by the same thread.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r815021592



##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -47,6 +51,70 @@
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static Runnable CRITICAL_TASK_CHECKER = new Runnable() {
+    @Override
+    public void run() {
+      while (true) {
+        Iterator<ScheduledFuture<?>> futures = CRITICAL_RUNNING_TASKS.iterator();
+        while (futures.hasNext()) {
+          ScheduledFuture<?> criticalTask = futures.next();
+          // Calling get() on a ScheduledFuture will block unless that scheduled task has
+          // completed. We call isDone() here instead. If the scheduled task is done then
+          // either it was a one-shot task, cancelled or an exception was thrown.
+          if (criticalTask.isDone()) {
+            // Now call get() to see if we get an exception.
+            try {
+              criticalTask.get();
+              // If we get here, then a scheduled task exited but did not throw an error
+              // or get canceled. This was likely a one-shot scheduled task.
+              if (!CRITICAL_RUNNING_TASKS.remove(criticalTask)) {
+                LOG.warn("Unable to remove task from list of watched critical tasks");
+              }
+            } catch (ExecutionException ee) {
+              // An exception was thrown in the critical task. Throw the error here, which
+              // will then be caught by the AccumuloUncaughtExceptionHandler which will
+              // log the error and terminate the VM.
+              throw new RuntimeException("Critical scheduled task failed.", ee.getCause());

Review comment:
       My thinking is that if these are critical background threads, then regardless of whether it's an Exception or an Error, the critical background task that should be running, isn't. I didn't wrap all scheduled tasks with this method, just the critical ones.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r815020581



##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -47,6 +51,70 @@
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static Runnable CRITICAL_TASK_CHECKER = new Runnable() {
+    @Override
+    public void run() {
+      while (true) {
+        Iterator<ScheduledFuture<?>> futures = CRITICAL_RUNNING_TASKS.iterator();
+        while (futures.hasNext()) {
+          ScheduledFuture<?> criticalTask = futures.next();
+          // Calling get() on a ScheduledFuture will block unless that scheduled task has
+          // completed. We call isDone() here instead. If the scheduled task is done then
+          // either it was a one-shot task, cancelled or an exception was thrown.
+          if (criticalTask.isDone()) {
+            // Now call get() to see if we get an exception.
+            try {
+              criticalTask.get();
+              // If we get here, then a scheduled task exited but did not throw an error
+              // or get canceled. This was likely a one-shot scheduled task.
+              if (!CRITICAL_RUNNING_TASKS.remove(criticalTask)) {
+                LOG.warn("Unable to remove task from list of watched critical tasks");
+              }
+            } catch (ExecutionException ee) {
+              // An exception was thrown in the critical task. Throw the error here, which
+              // will then be caught by the AccumuloUncaughtExceptionHandler which will
+              // log the error and terminate the VM.
+              throw new RuntimeException("Critical scheduled task failed.", ee.getCause());

Review comment:
       unresolved comment because I made a comment unrelated to the original problem... but the comment was inspired by the problem




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r818019410



##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -40,13 +44,129 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN",
+    justification = "Throwing Error for it to be caught by AccumuloUncaughtExceptionHandler")
 public class ThreadPools {
 
+  public static class ExecutionError extends Error {
+
+    private static final long serialVersionUID = 1L;
+
+    public ExecutionError(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
 
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> NON_CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static Runnable TASK_CHECKER = new Runnable() {
+    @Override
+    public void run() {
+      while (true) {
+        Iterator<ScheduledFuture<?>> criticalTasks = CRITICAL_RUNNING_TASKS.iterator();
+        while (criticalTasks.hasNext()) {
+          checkTaskFailed(criticalTasks.next(), true);

Review comment:
       Addressed in 9b53ceb




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r817677828



##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -40,13 +44,92 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN",
+    justification = "Throwing Error for it to be caught by AccumuloUncaughtExceptionHandler")
 public class ThreadPools {
 
+  public static class ExecutionError extends Error {
+
+    private static final long serialVersionUID = 1L;
+
+    public ExecutionError(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
 
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();

Review comment:
       Addressed in ab25541




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r817678443



##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
##########
@@ -576,14 +583,18 @@ private synchronized void addFailedMutations(MutationSet failedMutations) {
     private MutationSet recentFailures = null;
     private long initTime;
     private final Runnable task;
+    private final ScheduledFuture<?> future;
 
     FailedMutations() {
       task =
           Threads.createNamedRunnable("failed mutationBatchWriterLatencyTimers handler", this::run);
-      executor.scheduleWithFixedDelay(task, 0, 500, MILLISECONDS);
+      future = executor.scheduleWithFixedDelay(task, 0, 500, MILLISECONDS);
     }
 
     private MutationSet init() {
+      if (future.isDone()) {
+        throw new RuntimeException("Background task that re-queues failed mutations has exited.");
+      }

Review comment:
       Addressed in ab25541




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r816928406



##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -40,13 +44,92 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN",
+    justification = "Throwing Error for it to be caught by AccumuloUncaughtExceptionHandler")
 public class ThreadPools {
 
+  public static class ExecutionError extends Error {
+
+    private static final long serialVersionUID = 1L;
+
+    public ExecutionError(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
 
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();

Review comment:
       For non-critical probably do not want to throw a runtime exception, would probably just want to log something instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r818018348



##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -40,13 +44,92 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN",
+    justification = "Throwing Error for it to be caught by AccumuloUncaughtExceptionHandler")
 public class ThreadPools {
 
+  public static class ExecutionError extends Error {
+
+    private static final long serialVersionUID = 1L;
+
+    public ExecutionError(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
 
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static Runnable CRITICAL_TASK_CHECKER = new Runnable() {
+    @Override
+    public void run() {
+      while (true) {
+        Iterator<ScheduledFuture<?>> futures = CRITICAL_RUNNING_TASKS.iterator();
+        while (futures.hasNext()) {
+          ScheduledFuture<?> criticalTask = futures.next();
+          // Calling get() on a ScheduledFuture will block unless that scheduled task has
+          // completed. We call isDone() here instead. If the scheduled task is done then
+          // either it was a one-shot task, cancelled or an exception was thrown.
+          if (criticalTask.isDone()) {
+            // Now call get() to see if we get an exception.
+            try {
+              criticalTask.get();
+              // If we get here, then a scheduled task exited but did not throw an error
+              // or get canceled. This was likely a one-shot scheduled task (I don't think
+              // we can tell if it's one-shot or not, I think we have to assume that it is
+              // and that a recurring task would not normally be complete).

Review comment:
       That's right. In fact, `Future.get()` will *not* return (it will block) for a ScheduledFuture, which is why I check to see if `Future.isDone()` returns `true` first. There is this note in https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Future.html:
   
   ```
   The result can only be retrieved using method get when the computation has completed, blocking if necessary until it is ready
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r814983375



##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -47,6 +51,70 @@
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static Runnable CRITICAL_TASK_CHECKER = new Runnable() {
+    @Override
+    public void run() {
+      while (true) {
+        Iterator<ScheduledFuture<?>> futures = CRITICAL_RUNNING_TASKS.iterator();
+        while (futures.hasNext()) {
+          ScheduledFuture<?> criticalTask = futures.next();
+          // Calling get() on a ScheduledFuture will block unless that scheduled task has
+          // completed. We call isDone() here instead. If the scheduled task is done then
+          // either it was a one-shot task, cancelled or an exception was thrown.
+          if (criticalTask.isDone()) {
+            // Now call get() to see if we get an exception.
+            try {
+              criticalTask.get();
+              // If we get here, then a scheduled task exited but did not throw an error
+              // or get canceled. This was likely a one-shot scheduled task.
+              if (!CRITICAL_RUNNING_TASKS.remove(criticalTask)) {
+                LOG.warn("Unable to remove task from list of watched critical tasks");
+              }
+            } catch (ExecutionException ee) {
+              // An exception was thrown in the critical task. Throw the error here, which
+              // will then be caught by the AccumuloUncaughtExceptionHandler which will
+              // log the error and terminate the VM.
+              throw new RuntimeException("Critical scheduled task failed.", ee.getCause());

Review comment:
       See ccd2673




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r814976598



##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -47,6 +51,70 @@
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static Runnable CRITICAL_TASK_CHECKER = new Runnable() {
+    @Override
+    public void run() {
+      while (true) {
+        Iterator<ScheduledFuture<?>> futures = CRITICAL_RUNNING_TASKS.iterator();
+        while (futures.hasNext()) {
+          ScheduledFuture<?> criticalTask = futures.next();
+          // Calling get() on a ScheduledFuture will block unless that scheduled task has
+          // completed. We call isDone() here instead. If the scheduled task is done then
+          // either it was a one-shot task, cancelled or an exception was thrown.
+          if (criticalTask.isDone()) {
+            // Now call get() to see if we get an exception.
+            try {
+              criticalTask.get();
+              // If we get here, then a scheduled task exited but did not throw an error
+              // or get canceled. This was likely a one-shot scheduled task.
+              if (!CRITICAL_RUNNING_TASKS.remove(criticalTask)) {
+                LOG.warn("Unable to remove task from list of watched critical tasks");
+              }
+            } catch (ExecutionException ee) {
+              // An exception was thrown in the critical task. Throw the error here, which
+              // will then be caught by the AccumuloUncaughtExceptionHandler which will
+              // log the error and terminate the VM.
+              throw new RuntimeException("Critical scheduled task failed.", ee.getCause());

Review comment:
       > The uncaughtexceptionhandler only halts the VM when its an error. Since this is not an error then them VM will not be halted and this thread that checks future will be dead.
   
   Good point - I forgot about that. Since this is deemed a critical background thread, do you think we should throw an Error here to terminate the VM?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r817961963



##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -40,13 +44,129 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN",
+    justification = "Throwing Error for it to be caught by AccumuloUncaughtExceptionHandler")
 public class ThreadPools {
 
+  public static class ExecutionError extends Error {
+
+    private static final long serialVersionUID = 1L;
+
+    public ExecutionError(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
 
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> NON_CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static Runnable TASK_CHECKER = new Runnable() {
+    @Override
+    public void run() {
+      while (true) {
+        Iterator<ScheduledFuture<?>> criticalTasks = CRITICAL_RUNNING_TASKS.iterator();
+        while (criticalTasks.hasNext()) {
+          checkTaskFailed(criticalTasks.next(), true);

Review comment:
       If we pass the list to the method, then it would make the remove logic in the method a bit simpler.
   
   ```suggestion
             checkTaskFailed(criticalTasks.next(), true, CRITICAL_RUNNING_TASKS);
   ```
   
   Alternatively, if we make the method return true when it should be removed, then we could do the following to have the iterator remove it.
   
   ```suggestion
             if(checkTaskFailed(criticalTasks.next(), true)){
                criticalTasks.remove();
             }
   ```
   
   

##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -40,13 +44,129 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN",
+    justification = "Throwing Error for it to be caught by AccumuloUncaughtExceptionHandler")
 public class ThreadPools {
 
+  public static class ExecutionError extends Error {
+
+    private static final long serialVersionUID = 1L;
+
+    public ExecutionError(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
 
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> NON_CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static Runnable TASK_CHECKER = new Runnable() {
+    @Override
+    public void run() {
+      while (true) {
+        Iterator<ScheduledFuture<?>> criticalTasks = CRITICAL_RUNNING_TASKS.iterator();
+        while (criticalTasks.hasNext()) {
+          checkTaskFailed(criticalTasks.next(), true);
+        }
+        Iterator<ScheduledFuture<?>> nonCriticalTasks = NON_CRITICAL_RUNNING_TASKS.iterator();
+        while (nonCriticalTasks.hasNext()) {
+          checkTaskFailed(nonCriticalTasks.next(), false);
+        }
+        try {
+          TimeUnit.MINUTES.sleep(1);
+        } catch (InterruptedException ie) {
+          // This thread was interrupted by something while sleeping. We don't want to exit
+          // this thread, so reset the interrupt state on this thread and keep going.
+          Thread.interrupted();
+        }
+      }
+    }
+  };
+
+  private static void checkTaskFailed(Future<?> future, boolean critical) {
+    // Calling get() on a ScheduledFuture will block unless that scheduled task has
+    // completed. We call isDone() here instead. If the scheduled task is done then
+    // either it was a one-shot task, cancelled or an exception was thrown.
+    if (future.isDone()) {
+      // Now call get() to see if we get an exception.
+      try {
+        future.get();
+        // If we get here, then a scheduled task exited but did not throw an error
+        // or get canceled. This was likely a one-shot scheduled task (I don't think
+        // we can tell if it's one-shot or not, I think we have to assume that it is
+        // and that a recurring task would not normally be complete).
+        boolean removed = critical ? CRITICAL_RUNNING_TASKS.remove(future)
+            : NON_CRITICAL_RUNNING_TASKS.remove(future);
+        if (!removed) {
+          LOG.warn("Unable to remove task from list of watched tasks");
+        }
+      } catch (ExecutionException ee) {
+        // An exception was thrown in the critical task. Throw the error here, which
+        // will then be caught by the AccumuloUncaughtExceptionHandler which will
+        // log the error and terminate the VM.
+        if (critical) {
+          throw new ExecutionError("Critical scheduled task failed.", ee);
+        } else {
+          LOG.error("Non-critical task failed", ee);

Review comment:
       ```suggestion
             LOG.error("Non-critical scheduled background task failed", ee);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion merged pull request #2524: Check scheduled tasks periodically for errors

Posted by GitBox <gi...@apache.org>.
dlmarion merged pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r817651838



##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -40,13 +44,92 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN",
+    justification = "Throwing Error for it to be caught by AccumuloUncaughtExceptionHandler")
 public class ThreadPools {
 
+  public static class ExecutionError extends Error {
+
+    private static final long serialVersionUID = 1L;
+
+    public ExecutionError(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
 
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static Runnable CRITICAL_TASK_CHECKER = new Runnable() {
+    @Override
+    public void run() {
+      while (true) {
+        Iterator<ScheduledFuture<?>> futures = CRITICAL_RUNNING_TASKS.iterator();
+        while (futures.hasNext()) {
+          ScheduledFuture<?> criticalTask = futures.next();
+          // Calling get() on a ScheduledFuture will block unless that scheduled task has
+          // completed. We call isDone() here instead. If the scheduled task is done then
+          // either it was a one-shot task, cancelled or an exception was thrown.
+          if (criticalTask.isDone()) {
+            // Now call get() to see if we get an exception.
+            try {
+              criticalTask.get();
+              // If we get here, then a scheduled task exited but did not throw an error
+              // or get canceled. This was likely a one-shot scheduled task (I don't think
+              // we can tell if it's one-shot or not, I think we have to assume that it is
+              // and that a recurring task would not normally be complete).

Review comment:
       There are places in the code where we schedule a task to run once, so if it exits normally, I don't think it's an error. Here's an example: https://github.com/apache/accumulo/blob/main/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java#L223. If an error is thrown in the one-shot task, this code will handle it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r815018290



##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -47,6 +51,70 @@
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();
+
+  private static Runnable CRITICAL_TASK_CHECKER = new Runnable() {
+    @Override
+    public void run() {
+      while (true) {
+        Iterator<ScheduledFuture<?>> futures = CRITICAL_RUNNING_TASKS.iterator();
+        while (futures.hasNext()) {
+          ScheduledFuture<?> criticalTask = futures.next();
+          // Calling get() on a ScheduledFuture will block unless that scheduled task has
+          // completed. We call isDone() here instead. If the scheduled task is done then
+          // either it was a one-shot task, cancelled or an exception was thrown.
+          if (criticalTask.isDone()) {
+            // Now call get() to see if we get an exception.
+            try {
+              criticalTask.get();
+              // If we get here, then a scheduled task exited but did not throw an error
+              // or get canceled. This was likely a one-shot scheduled task.
+              if (!CRITICAL_RUNNING_TASKS.remove(criticalTask)) {
+                LOG.warn("Unable to remove task from list of watched critical tasks");
+              }
+            } catch (ExecutionException ee) {
+              // An exception was thrown in the critical task. Throw the error here, which
+              // will then be caught by the AccumuloUncaughtExceptionHandler which will
+              // log the error and terminate the VM.
+              throw new RuntimeException("Critical scheduled task failed.", ee.getCause());

Review comment:
       Wondering if we should do something like the following in UEH to cover the case where a cause is an Error.
   
   ```java
   class AccumuloUncaughtExceptionHandler implements UncaughtExceptionHandler {
   
     private static final Logger LOG = LoggerFactory.getLogger(AccumuloUncaughtExceptionHandler.class);
   
     private static boolean isError(Throwable t) {
       while(t != null) {
         if (t instanceof Error) {
           return true;
         }
         
         t = t.getCause();
       }
       
       return false;
     }
     
     @Override
     public void uncaughtException(Thread t, Throwable e) {
       if (e instanceof Exception) {
         LOG.error("Caught an Exception in {}. Thread is dead.", t, e);
       } else if isError(e) {
         try {
           e.printStackTrace();
           System.err.println("Error thrown in thread: " + t + ", halting VM.");
         } catch (Throwable e1) {
           // If e == OutOfMemoryError, then it's probably that another Error might be
           // thrown when trying to print to System.err.
         } finally {
           Runtime.getRuntime().halt(-1);
         }
       }
     }
   }
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r816928406



##########
File path: core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
##########
@@ -40,13 +44,92 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN",
+    justification = "Throwing Error for it to be caught by AccumuloUncaughtExceptionHandler")
 public class ThreadPools {
 
+  public static class ExecutionError extends Error {
+
+    private static final long serialVersionUID = 1L;
+
+    public ExecutionError(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
 
   // the number of seconds before we allow a thread to terminate with non-use.
   public static final long DEFAULT_TIMEOUT_MILLISECS = 180000L;
 
+  private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
+      createFixedThreadPool(1, "Scheduled Future Checker", false);
+
+  private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS =
+      new ConcurrentLinkedQueue<>();

Review comment:
       For non-critical probably do not want to throw an runtime exception, would probably just want to log something instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2524: Beginning to address unchecked Futures

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2524:
URL: https://github.com/apache/accumulo/pull/2524#discussion_r817677951



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
##########
@@ -68,8 +69,9 @@ public CompactionExecutorsMetrics() {
         ThreadPools.createScheduledExecutorService(1, "compactionExecutorsMetricsPoller", false);
     Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
     long minimumRefreshDelay = TimeUnit.SECONDS.toMillis(5);
-    scheduler.scheduleAtFixedRate(this::update, minimumRefreshDelay, minimumRefreshDelay,
-        TimeUnit.MILLISECONDS);
+    @SuppressWarnings("unused")
+    ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(this::update, minimumRefreshDelay,

Review comment:
       Addressed in ab25541

##########
File path: server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
##########
@@ -155,7 +156,8 @@ public void registerMetrics(final MeterRegistry registry) {
         ThreadPools.createScheduledExecutorService(1, "fateMetricsPoller", false);
     Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
 
-    scheduler.scheduleAtFixedRate(() -> {
+    @SuppressWarnings("unused")
+    ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {

Review comment:
       Addressed in ab25541

##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
##########
@@ -380,12 +382,16 @@ private void reschedule(SendTask task) {
         queue(mutations);
     };
 
-    threadPool.scheduleAtFixedRate(failureHandler, 250, 250, MILLISECONDS);
+    failureTaskFuture = threadPool.scheduleAtFixedRate(failureHandler, 250, 250, MILLISECONDS);
   }
 
   @Override
   public Iterator<Result> write(Iterator<ConditionalMutation> mutations) {
 
+    if (failureTaskFuture.isDone()) {
+      throw new RuntimeException("Background task that re-queues failed mutations has exited.");
+    }

Review comment:
       Addressed in ab25541




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org