You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/04/03 20:12:31 UTC
[nifi] 13/17: NIFI-6155: Ensure that any task submitted to
FlowEngine catches Throwable so that the task doesn't die just die silently
in the case of an unexpected error/exception
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch NIFI-6169-RC2
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 03e888ac20f79054c1a3a9ea1e36b52c07bbb77a
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Mar 29 09:25:10 2019 -0400
NIFI-6155: Ensure that any task submitted to FlowEngine catches Throwable so that the task doesn't die just die silently in the case of an unexpected error/exception
This closes #3395.
Signed-off-by: Bryan Bende <bb...@apache.org>
---
.../nifi/controller/tasks/ConnectableTask.java | 4 ++
.../java/org/apache/nifi/engine/FlowEngine.java | 50 ++++++++++++++++++++++
2 files changed, 54 insertions(+)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
index 5a49c72..180e2a2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
@@ -149,6 +149,8 @@ public class ConnectableTask {
}
public InvocationResult invoke() {
+ logger.trace("Triggering {}", connectable);
+
if (scheduleState.isTerminated()) {
return InvocationResult.DO_NOT_YIELD;
}
@@ -165,12 +167,14 @@ public class ConnectableTask {
// Make sure processor has work to do.
if (!isWorkToDo()) {
+ logger.debug("Yielding {} because it has no work to do", connectable);
return InvocationResult.yield("No work to do");
}
if (numRelationships > 0) {
final int requiredNumberOfAvailableRelationships = connectable.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;
if (!repositoryContext.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships)) {
+ logger.debug("Yielding {} because Backpressure is Applied", connectable);
return InvocationResult.yield("Backpressure Applied");
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
index ffc463f..af63dcc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
@@ -20,11 +20,14 @@ import org.apache.nifi.nar.NarThreadContextClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public final class FlowEngine extends ScheduledThreadPoolExecutor {
@@ -79,6 +82,53 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor {
super.beforeExecute(thread, runnable);
}
+ @Override
+ public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
+ return super.schedule(wrap(command), delay, unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) {
+ return super.scheduleAtFixedRate(wrap(command), initialDelay, period, unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) {
+ return super.scheduleWithFixedDelay(wrap(command), initialDelay, delay, unit);
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) {
+ return super.schedule(wrap(callable), delay, unit);
+ }
+
+ private Runnable wrap(final Runnable runnable) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ try {
+ runnable.run();
+ } catch (final Throwable t) {
+ logger.error("Uncaught Exception in Runnable task", t);
+ }
+ }
+ };
+ }
+
+ private <T> Callable<T> wrap(final Callable<T> callable) {
+ return new Callable<T>() {
+ @Override
+ public T call() throws Exception {
+ try {
+ return callable.call();
+ } catch (final Throwable t) {
+ logger.error("Uncaught Exception in Callable task", t);
+ throw t;
+ }
+ }
+ };
+ }
+
/**
* Hook method called by the thread that executed the given runnable after execution of the runnable completed. Logs the fact of completion and any errors that might have occurred.
*