You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/04/03 20:12:31 UTC

[nifi] 13/17: NIFI-6155: Ensure that any task submitted to FlowEngine catches Throwable so that the task doesn't die just die silently in the case of an unexpected error/exception

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch NIFI-6169-RC2
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 03e888ac20f79054c1a3a9ea1e36b52c07bbb77a
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Mar 29 09:25:10 2019 -0400

    NIFI-6155: Ensure that any task submitted to FlowEngine catches Throwable so that the task doesn't die just die silently in the case of an unexpected error/exception
    
    This closes #3395.
    
    Signed-off-by: Bryan Bende <bb...@apache.org>
---
 .../nifi/controller/tasks/ConnectableTask.java     |  4 ++
 .../java/org/apache/nifi/engine/FlowEngine.java    | 50 ++++++++++++++++++++++
 2 files changed, 54 insertions(+)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
index 5a49c72..180e2a2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
@@ -149,6 +149,8 @@ public class ConnectableTask {
     }
 
     public InvocationResult invoke() {
+        logger.trace("Triggering {}", connectable);
+
         if (scheduleState.isTerminated()) {
             return InvocationResult.DO_NOT_YIELD;
         }
@@ -165,12 +167,14 @@ public class ConnectableTask {
 
         // Make sure processor has work to do.
         if (!isWorkToDo()) {
+            logger.debug("Yielding {} because it has no work to do", connectable);
             return InvocationResult.yield("No work to do");
         }
 
         if (numRelationships > 0) {
             final int requiredNumberOfAvailableRelationships = connectable.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;
             if (!repositoryContext.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships)) {
+                logger.debug("Yielding {} because Backpressure is Applied", connectable);
                 return InvocationResult.yield("Backpressure Applied");
             }
         }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
index ffc463f..af63dcc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
@@ -20,11 +20,14 @@ import org.apache.nifi.nar.NarThreadContextClassLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public final class FlowEngine extends ScheduledThreadPoolExecutor {
@@ -79,6 +82,53 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor {
         super.beforeExecute(thread, runnable);
     }
 
+    @Override
+    public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
+        return super.schedule(wrap(command), delay, unit);
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) {
+        return super.scheduleAtFixedRate(wrap(command), initialDelay, period, unit);
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) {
+        return super.scheduleWithFixedDelay(wrap(command), initialDelay, delay, unit);
+    }
+
+    @Override
+    public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) {
+        return super.schedule(wrap(callable), delay, unit);
+    }
+
+    private Runnable wrap(final Runnable runnable) {
+        return new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    runnable.run();
+                } catch (final Throwable t) {
+                    logger.error("Uncaught Exception in Runnable task", t);
+                }
+            }
+        };
+    }
+
+    private <T> Callable<T> wrap(final Callable<T> callable) {
+        return new Callable<T>() {
+            @Override
+            public T call() throws Exception {
+                try {
+                    return callable.call();
+                } catch (final Throwable t) {
+                    logger.error("Uncaught Exception in Callable task", t);
+                    throw t;
+                }
+            }
+        };
+    }
+
     /**
      * Hook method called by the thread that executed the given runnable after execution of the runnable completed. Logs the fact of completion and any errors that might have occurred.
      *