You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/22 17:19:22 UTC
[26/28] incubator-nifi git commit: NIFI-362: Avoid continually
scheduling components to run if there is no work for them to do or if they
are yielded
NIFI-362: Avoid continually scheduling components to run if there is no work for them to do or if they are yielded
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/4cc106a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/4cc106a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/4cc106a5
Branch: refs/heads/nifi-site-to-site-client
Commit: 4cc106a54d9b6528e38cb99ecb15524a07a1f0c9
Parents: dde5fd5
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Feb 22 10:53:24 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sun Feb 22 10:53:24 2015 -0500
----------------------------------------------------------------------
.../apache/nifi/controller/StandardFunnel.java | 2 +-
.../scheduling/QuartzSchedulingAgent.java | 21 +++-
.../controller/scheduling/ScheduleState.java | 18 ++--
.../scheduling/TimerDrivenSchedulingAgent.java | 105 ++++++++++++++++---
.../tasks/ContinuallyRunConnectableTask.java | 32 ++++--
.../tasks/ContinuallyRunProcessorTask.java | 32 +++---
6 files changed, 163 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4cc106a5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
index e34e043..3bdfd20 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
@@ -94,7 +94,7 @@ public class StandardFunnel implements Funnel {
position = new AtomicReference<>(new Position(0D, 0D));
scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
penalizationPeriod = new AtomicReference<>("30 sec");
- yieldPeriod = new AtomicReference<>("1 sec");
+ yieldPeriod = new AtomicReference<>("250 millis");
yieldExpiration = new AtomicLong(0L);
schedulingPeriod = new AtomicReference<>("0 millis");
schedulingNanos = new AtomicLong(30000);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4cc106a5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
index ea67492..3355e73 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
@@ -21,6 +21,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,8 +35,9 @@ import org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask;
import org.apache.nifi.controller.tasks.ReportingTaskWrapper;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.processor.StandardProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.FormatUtils;
-
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -130,13 +132,16 @@ public class QuartzSchedulingAgent implements SchedulingAgent {
final List<AtomicBoolean> triggers = new ArrayList<>();
for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) {
- final Runnable continuallyRunTask;
+ final Callable<Boolean> continuallyRunTask;
if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
final ProcessorNode procNode = (ProcessorNode) connectable;
- ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, encryptor);
+
+ final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor);
+ ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, standardProcContext);
continuallyRunTask = runnableTask;
} else {
- continuallyRunTask = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, encryptor);
+ final ConnectableProcessContext connProcContext = new ConnectableProcessContext(connectable, encryptor);
+ continuallyRunTask = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, connProcContext);
}
final AtomicBoolean canceled = new AtomicBoolean(false);
@@ -147,7 +152,13 @@ public class QuartzSchedulingAgent implements SchedulingAgent {
return;
}
- continuallyRunTask.run();
+ try {
+ continuallyRunTask.call();
+ } catch (final RuntimeException re) {
+ throw re;
+ } catch (final Exception e) {
+ throw new ProcessException(e);
+ }
if (canceled.get()) {
return;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4cc106a5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
index eb5a437..ff17912 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
@@ -16,9 +16,10 @@
*/
package org.apache.nifi.controller.scheduling;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
-import java.util.List;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -27,7 +28,7 @@ public class ScheduleState {
private final AtomicInteger activeThreadCount = new AtomicInteger(0);
private final AtomicBoolean scheduled = new AtomicBoolean(false);
- private final List<ScheduledFuture<?>> futures = new ArrayList<>();
+ private final Set<ScheduledFuture<?>> futures = new HashSet<ScheduledFuture<?>>();
private final AtomicBoolean mustCallOnStoppedMethods = new AtomicBoolean(false);
private volatile long lastStopTime = -1;
@@ -79,12 +80,17 @@ public class ScheduleState {
*
* @param newFutures
*/
- public void setFutures(final List<ScheduledFuture<?>> newFutures) {
+ public synchronized void setFutures(final Collection<ScheduledFuture<?>> newFutures) {
futures.clear();
futures.addAll(newFutures);
}
- public List<ScheduledFuture<?>> getFutures() {
- return Collections.unmodifiableList(futures);
+ public synchronized void replaceFuture(final ScheduledFuture<?> oldFuture, final ScheduledFuture<?> newFuture) {
+ futures.remove(oldFuture);
+ futures.add(newFuture);
+ }
+
+ public synchronized Set<ScheduledFuture<?>> getFutures() {
+ return Collections.unmodifiableSet(futures);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4cc106a5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
index db06151..efa8acd 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
@@ -18,8 +18,10 @@ package org.apache.nifi.controller.scheduling;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
@@ -31,15 +33,17 @@ import org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask;
import org.apache.nifi.controller.tasks.ReportingTaskWrapper;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.StandardProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.FormatUtils;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TimerDrivenSchedulingAgent implements SchedulingAgent {
-
private static final Logger logger = LoggerFactory.getLogger(TimerDrivenSchedulingAgent.class);
-
+ private static final long NO_WORK_YIELD_NANOS = TimeUnit.MILLISECONDS.toNanos(10L);
+
private final FlowController flowController;
private final FlowEngine flowEngine;
private final ProcessContextFactory contextFactory;
@@ -72,20 +76,95 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent {
logger.info("{} started.", taskNode.getReportingTask());
}
+
@Override
public void schedule(final Connectable connectable, final ScheduleState scheduleState) {
- final Runnable runnable;
- if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
- final ProcessorNode procNode = (ProcessorNode) connectable;
- ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, encryptor);
- runnable = runnableTask;
- } else {
- runnable = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, encryptor);
- }
-
+
final List<ScheduledFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) {
- final ScheduledFuture<?> future = flowEngine.scheduleWithFixedDelay(runnable, 0L, connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+ final Callable<Boolean> continuallyRunTask;
+ final ProcessContext processContext;
+
+ // Determine the task to run and create it.
+ if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
+ final ProcessorNode procNode = (ProcessorNode) connectable;
+ final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor);
+ final ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController,
+ contextFactory, scheduleState, standardProcContext);
+
+ continuallyRunTask = runnableTask;
+ processContext = standardProcContext;
+ } else {
+ processContext = new ConnectableProcessContext(connectable, encryptor);
+ continuallyRunTask = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, processContext);
+ }
+
+ final AtomicReference<ScheduledFuture<?>> futureRef = new AtomicReference<>();
+
+ final Runnable yieldDetectionRunnable = new Runnable() {
+ @Override
+ public void run() {
+ // Call the continually run task. It will return a boolean indicating whether or not we should yield
+ // based on a lack of work for to do for the component.
+ final boolean shouldYield;
+ try {
+ shouldYield = continuallyRunTask.call();
+ } catch (final RuntimeException re) {
+ throw re;
+ } catch (final Exception e) {
+ throw new ProcessException(e);
+ }
+
+ // If the component is yielded, cancel its future and re-submit it to run again
+ // after the yield has expired.
+ final long newYieldExpiration = connectable.getYieldExpiration();
+ if ( newYieldExpiration > System.currentTimeMillis() ) {
+ final long yieldMillis = System.currentTimeMillis() - newYieldExpiration;
+ final ScheduledFuture<?> scheduledFuture = futureRef.get();
+ if ( scheduledFuture == null ) {
+ return;
+ }
+
+ // If we are able to cancel the future, create a new one and update the ScheduleState so that it has
+ // an accurate accounting of which futures are outstanding; we must then also update the futureRef
+ // so that we can do this again the next time that the component is yielded.
+ if (scheduledFuture.cancel(false)) {
+ final long yieldNanos = TimeUnit.MILLISECONDS.toNanos(yieldMillis);
+ final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, yieldNanos,
+ connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+ scheduleState.replaceFuture(scheduledFuture, newFuture);
+ futureRef.set(newFuture);
+ }
+ } else if ( shouldYield ) {
+ // Component itself didn't yield but there was no work to do, so the framework will choose
+ // to yield the component automatically for a short period of time.
+ final ScheduledFuture<?> scheduledFuture = futureRef.get();
+ if ( scheduledFuture == null ) {
+ return;
+ }
+
+ // If we are able to cancel the future, create a new one and update the ScheduleState so that it has
+ // an accurate accounting of which futures are outstanding; we must then also update the futureRef
+ // so that we can do this again the next time that the component is yielded.
+ if (scheduledFuture.cancel(false)) {
+ final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, NO_WORK_YIELD_NANOS,
+ connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+ scheduleState.replaceFuture(scheduledFuture, newFuture);
+ futureRef.set(newFuture);
+ }
+ }
+ }
+ };
+
+ // Schedule the task to run
+ final ScheduledFuture<?> future = flowEngine.scheduleWithFixedDelay(yieldDetectionRunnable, 0L,
+ connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+
+ // now that we have the future, set the atomic reference so that if the component is yielded we
+ // are able to then cancel this future.
+ futureRef.set(future);
+
+ // Keep track of the futures so that we can update the ScheduleState.
futures.add(future);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4cc106a5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
index aca870b..408032c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
@@ -16,16 +16,16 @@
*/
package org.apache.nifi.controller.tasks;
+import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
-import org.apache.nifi.controller.scheduling.ConnectableProcessContext;
import org.apache.nifi.controller.scheduling.ProcessContextFactory;
import org.apache.nifi.controller.scheduling.ScheduleState;
-import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.Connectables;
@@ -33,28 +33,33 @@ import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ContinuallyRunConnectableTask implements Runnable {
+/**
+ * Continually runs a Connectable as long as the processor has work to do. {@link #call()} will return
+ * <code>true</code> if the Connectable should be yielded, <code>false</code> otherwise.
+ */
+public class ContinuallyRunConnectableTask implements Callable<Boolean> {
private static final Logger logger = LoggerFactory.getLogger(ContinuallyRunConnectableTask.class);
private final Connectable connectable;
private final ScheduleState scheduleState;
private final ProcessSessionFactory sessionFactory;
- private final ConnectableProcessContext processContext;
+ private final ProcessContext processContext;
- public ContinuallyRunConnectableTask(final ProcessContextFactory contextFactory, final Connectable connectable, final ScheduleState scheduleState, final StringEncryptor encryptor) {
+ public ContinuallyRunConnectableTask(final ProcessContextFactory contextFactory, final Connectable connectable, final ScheduleState scheduleState, final ProcessContext processContext) {
this.connectable = connectable;
this.scheduleState = scheduleState;
this.sessionFactory = new StandardProcessSessionFactory(contextFactory.newProcessContext(connectable, new AtomicLong(0L)));
- this.processContext = new ConnectableProcessContext(connectable, encryptor);
+ this.processContext = processContext;
}
- @SuppressWarnings("deprecation")
@Override
- public void run() {
+ @SuppressWarnings("deprecation")
+ public Boolean call() {
if (!scheduleState.isScheduled()) {
- return;
+ return false;
}
+
// Connectable should run if the following conditions are met:
// 1. It's an Input Port or or is a Remote Input Port or has incoming FlowFiles queued
// 2. Any relationship is available (since there's only 1
@@ -62,8 +67,9 @@ public class ContinuallyRunConnectableTask implements Runnable {
// it means the same thing)
// 3. It is not yielded.
final boolean triggerWhenEmpty = connectable.isTriggerWhenEmpty();
+ boolean flowFilesQueued = true;
final boolean shouldRun = (connectable.getYieldExpiration() < System.currentTimeMillis())
- && (triggerWhenEmpty || Connectables.flowFilesQueued(connectable)) && (connectable.getRelationships().isEmpty() || Connectables.anyRelationshipAvailable(connectable));
+ && (triggerWhenEmpty || (flowFilesQueued = Connectables.flowFilesQueued(connectable))) && (connectable.getRelationships().isEmpty() || Connectables.anyRelationshipAvailable(connectable));
if (shouldRun) {
scheduleState.incrementActiveThreadCount();
@@ -92,6 +98,12 @@ public class ContinuallyRunConnectableTask implements Runnable {
scheduleState.decrementActiveThreadCount();
}
+ } else if (!flowFilesQueued) {
+ // FlowFiles must be queued in order to run but there are none queued;
+ // yield for just a bit.
+ return true;
}
+
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4cc106a5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
index 33bd327..f4be855 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
@@ -17,6 +17,7 @@
package org.apache.nifi.controller.tasks;
import java.io.IOException;
+import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -31,7 +32,6 @@ import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.scheduling.ProcessContextFactory;
import org.apache.nifi.controller.scheduling.ScheduleState;
import org.apache.nifi.controller.scheduling.SchedulingAgent;
-import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.ProcessSessionFactory;
@@ -43,7 +43,12 @@ import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ContinuallyRunProcessorTask implements Runnable {
+
+/**
+ * Continually runs a processor as long as the processor has work to do. {@link #call()} will return
+ * <code>true</code> if the processor should be yielded, <code>false</code> otherwise.
+ */
+public class ContinuallyRunProcessorTask implements Callable<Boolean> {
private static final Logger logger = LoggerFactory.getLogger(ContinuallyRunProcessorTask.class);
@@ -56,7 +61,8 @@ public class ContinuallyRunProcessorTask implements Runnable {
private final int numRelationships;
public ContinuallyRunProcessorTask(final SchedulingAgent schedulingAgent, final ProcessorNode procNode,
- final FlowController flowController, final ProcessContextFactory contextFactory, final ScheduleState scheduleState, final StringEncryptor encryptor) {
+ final FlowController flowController, final ProcessContextFactory contextFactory, final ScheduleState scheduleState,
+ final StandardProcessContext processContext) {
this.schedulingAgent = schedulingAgent;
this.procNode = procNode;
@@ -65,28 +71,28 @@ public class ContinuallyRunProcessorTask implements Runnable {
this.flowController = flowController;
context = contextFactory.newProcessContext(procNode, new AtomicLong(0L));
- this.processContext = new StandardProcessContext(procNode, flowController, encryptor);
+ this.processContext = processContext;
}
- @SuppressWarnings("deprecation")
@Override
- public void run() {
+ @SuppressWarnings("deprecation")
+ public Boolean call() {
// make sure processor is not yielded
boolean shouldRun = (procNode.getYieldExpiration() < System.currentTimeMillis());
if (!shouldRun) {
- return;
+ return false;
}
// make sure that either we're not clustered or this processor runs on all nodes or that this is the primary node
shouldRun = !procNode.isIsolated() || !flowController.isClustered() || flowController.isPrimary();
if (!shouldRun) {
- return;
+ return false;
}
// make sure that either proc has incoming FlowFiles or has no incoming connections or is annotated with @TriggerWhenEmpty
shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode);
if (!shouldRun) {
- return;
+ return true;
}
if (numRelationships > 0) {
@@ -109,7 +115,7 @@ public class ContinuallyRunProcessorTask implements Runnable {
}
if (!shouldRun) {
- return;
+ return false;
}
scheduleState.incrementActiveThreadCount();
@@ -124,11 +130,11 @@ public class ContinuallyRunProcessorTask implements Runnable {
invocationCount++;
if (!batch) {
- return;
+ return false;
}
if (System.nanoTime() > finishNanos) {
- return;
+ return false;
}
shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode);
@@ -180,6 +186,8 @@ public class ContinuallyRunProcessorTask implements Runnable {
logger.error("", e);
}
}
+
+ return false;
}
}