You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2015/11/18 21:41:26 UTC
nifi git commit: NIFI-1168: Ensure that processors with only looping
connections are scheduled to run, even if the connections have no FlowFiles;
expose these details to processor developers; update documentation
Repository: nifi
Updated Branches:
refs/heads/master 773576e04 -> 69bce2c2d
NIFI-1168: Ensure that processors with only looping
connections are scheduled to run, even if the connections have no FlowFiles;
expose these details to processor developers; update documentation
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/69bce2c2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/69bce2c2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/69bce2c2
Branch: refs/heads/master
Commit: 69bce2c2db2619e0323434228c31f98630e43372
Parents: 773576e
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Nov 18 14:53:30 2015 -0500
Committer: Aldrin Piri <al...@apache.org>
Committed: Wed Nov 18 14:53:30 2015 -0500
----------------------------------------------------------------------
.../annotation/behavior/TriggerWhenEmpty.java | 26 +++++--
.../apache/nifi/processor/ProcessContext.java | 10 ++-
.../apache/nifi/util/MockProcessContext.java | 12 ++-
.../nifi/util/StandardProcessorTestRunner.java | 5 ++
.../java/org/apache/nifi/util/TestRunner.java | 10 ++-
.../documentation/mock/MockProcessContext.java | 5 ++
.../scheduling/ConnectableProcessContext.java | 6 ++
.../tasks/ContinuallyRunProcessorTask.java | 46 +++++++----
.../nifi/processor/StandardProcessContext.java | 6 ++
.../processor/StandardSchedulingContext.java | 5 ++
.../java/org/apache/nifi/util/Connectables.java | 12 +++
.../tasks/TestContinuallyRunProcessorTask.java | 82 ++++++++++++++++++++
.../nifi/processors/standard/ExecuteSQL.java | 6 +-
13 files changed, 207 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenEmpty.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenEmpty.java b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenEmpty.java
index f442b80..6fac03b 100644
--- a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenEmpty.java
+++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenEmpty.java
@@ -24,14 +24,30 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
+ * <p>
* Marker annotation a {@link org.apache.nifi.processor.Processor Processor}
* implementation can use to indicate that the Processor should still be
- * triggered even when it has no data in its work queue. By default, Processors
- * which have no non-self incoming edges will be triggered even if there is no
- * work in its queue. However, Processors that have non-self incoming edges will
- * only be triggered if they have work in their queue or they present this
- * annotation.
+ * triggered even when it has no data in its work queue.
+ * </p>
*
+ * <p>
+ * A Processor is scheduled to be triggered based on its configured Scheduling Period
+ * and Scheduling Strategy. However, when the scheduling period elapses, the Processor
+ * will not be scheduled if it has no work to do. Normally, a Processor is said to have
+ * work to do if one of the following circumstances is true:
+ * </p>
+ *
+ * <ul>
+ * <li>An incoming Connection has data in its queue</li>
+ * <li>The Processor has no incoming Connections.</li>
+ * <li>All incoming Connections are self-loops (both the source and destination of the Connection are the same Processor).
+ * </ul>
+ *
+ * <p>
+ * If the Processor needs to be triggered to run even when the above conditions are all
+ * <code>false</code>, the Processor's class can be annotated with this annotation, which
+ * will cause the Processor to be triggered, even if its incoming queues are empty.
+ * </p>
*/
@Documented
@Target({ElementType.TYPE})
http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
index bb69c65..7488b2d 100644
--- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
+++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
@@ -133,9 +133,17 @@ public interface ProcessContext {
boolean hasIncomingConnection();
/**
+ * @return <code>true</code> if the processor has one or more incoming connections for
+ * which the source of the connection is NOT the processor; returns <code>false</code> if
+ * the processor has no incoming connections or if all incoming connections are self-loops
+ * (i.e., the processor is also the source of all incoming connections).
+ */
+ boolean hasNonLoopConnection();
+
+ /**
* @param relationship a relationship to check for connections
* @return true if the relationship has one or more outbound connections,
- * false otherwise
+ * false otherwise
*/
boolean hasConnection(Relationship relationship);
http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
index a350ecb..49021d1 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
@@ -48,7 +48,8 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
private boolean yieldCalled = false;
private boolean enableExpressionValidation = false;
private boolean allowExpressionValidation = true;
- private boolean incomingConnection = true;
+ private volatile boolean incomingConnection = true;
+ private volatile boolean nonLoopConnection = true;
private volatile Set<Relationship> connections = new HashSet<>();
private volatile Set<Relationship> unavailableRelationships = new HashSet<>();
@@ -305,6 +306,15 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
return this.connections.contains(relationship);
}
+ public void setNonLoopConnection(final boolean hasNonLoopConnection) {
+ this.nonLoopConnection = hasNonLoopConnection;
+ }
+
+ @Override
+ public boolean hasNonLoopConnection() {
+ return nonLoopConnection;
+ }
+
public void addConnection(final Relationship relationship) {
this.connections.add(relationship);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 0d00cc8..2f384ba 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -524,6 +524,11 @@ public class StandardProcessorTestRunner implements TestRunner {
}
@Override
+ public void setNonLoopConnection(final boolean hasNonLoopConnection) {
+ context.setNonLoopConnection(hasNonLoopConnection);
+ }
+
+ @Override
public void addConnection(Relationship relationship) {
context.addConnection(relationship);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index ec901fe..b1e7c8c 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -492,11 +492,19 @@ public interface TestRunner {
* Indicates to the framework that the configured processor has one or more
* incoming connections.
*
- * @param hasIncomingConnection whether or not the configured processor has an incoming connection
+ * @param hasIncomingConnection whether or not the configured processor should behave as though it has an incoming connection
*/
void setIncomingConnection(boolean hasIncomingConnection);
/**
+ * Indicates to the framework that the configured processor has one or more incoming
+ * connections for which the processor is not also the source.
+ *
+ * @param hasNonLoopConnection whether or not the configured processor should behave as though it has a non-looping incoming connection
+ */
+ void setNonLoopConnection(boolean hasNonLoopConnection);
+
+ /**
* Indicates to the Framework that the configured processor has a connection for the given Relationship.
*
* @param relationship that has a connection
http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java
index 3238d4a..38c9fc9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java
@@ -89,6 +89,11 @@ public class MockProcessContext implements ProcessContext {
}
@Override
+ public boolean hasNonLoopConnection() {
+ return true;
+ }
+
+ @Override
public boolean hasConnection(Relationship relationship) {
return false;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
index da7162e..08e2504 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
@@ -37,6 +37,7 @@ import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.Connectables;
/**
* This class is essentially an empty shell for {@link Connectable}s that are not Processors
@@ -197,6 +198,11 @@ public class ConnectableProcessContext implements ProcessContext {
}
@Override
+ public boolean hasNonLoopConnection() {
+ return Connectables.hasNonLoopConnection(connectable);
+ }
+
+ @Override
public boolean hasConnection(Relationship relationship) {
Set<Connection> connections = connectable.getConnections(relationship);
return connections != null && !connections.isEmpty();
http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
index d4d595b..dd12824 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
@@ -72,30 +72,45 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
this.processContext = processContext;
}
+ static boolean isRunOnCluster(final ProcessorNode procNode, final boolean isClustered, final boolean isPrimary) {
+ return !procNode.isIsolated() || !isClustered || isPrimary;
+ }
+
+ static boolean isYielded(final ProcessorNode procNode) {
+ return procNode.getYieldExpiration() >= System.currentTimeMillis();
+ }
+
+ static boolean isWorkToDo(final ProcessorNode procNode) {
+ return procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || !Connectables.hasNonLoopConnection(procNode) || Connectables.flowFilesQueued(procNode);
+ }
+
@Override
@SuppressWarnings("deprecation")
public Boolean call() {
// make sure processor is not yielded
- boolean shouldRun = (procNode.getYieldExpiration() < System.currentTimeMillis());
- if (!shouldRun) {
+ if (isYielded(procNode)) {
return false;
}
// make sure that either we're not clustered or this processor runs on all nodes or that this is the primary node
- shouldRun = !procNode.isIsolated() || !flowController.isClustered() || flowController.isPrimary();
- if (!shouldRun) {
+ if (!isRunOnCluster(procNode, flowController.isClustered(), flowController.isPrimary())) {
return false;
}
- // make sure that either proc has incoming FlowFiles or has no incoming connections or is annotated with @TriggerWhenEmpty
- shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode);
- if (!shouldRun) {
+ // Make sure processor has work to do. This means that it meets one of these criteria:
+ // * It is annotated with @TriggerWhenEmpty
+ // * It has data in an incoming Connection
+ // * It has no incoming connections
+ // * All incoming connections are self-loops
+ if (!isWorkToDo(procNode)) {
return true;
}
if (numRelationships > 0) {
final int requiredNumberOfAvailableRelationships = procNode.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;
- shouldRun = context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships);
+ if (!context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships)) {
+ return false;
+ }
}
final long batchNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS);
@@ -112,10 +127,6 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
batch = false;
}
- if (!shouldRun) {
- return false;
- }
-
scheduleState.incrementActiveThreadCount();
final long startNanos = System.nanoTime();
@@ -123,6 +134,7 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
int invocationCount = 0;
try {
try (final AutoCloseable ncl = NarCloseable.withNarLoader()) {
+ boolean shouldRun = true;
while (shouldRun) {
procNode.onTrigger(processContext, sessionFactory);
invocationCount++;
@@ -135,10 +147,14 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
return false;
}
- shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode);
- shouldRun = shouldRun && (procNode.getYieldExpiration() < System.currentTimeMillis());
+ if (!isWorkToDo(procNode)) {
+ break;
+ }
+ if (isYielded(procNode)) {
+ break;
+ }
- if (shouldRun && numRelationships > 0) {
+ if (numRelationships > 0) {
final int requiredNumberOfAvailableRelationships = procNode.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;
shouldRun = context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
index 76849bd..a6302ca 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
@@ -32,6 +32,7 @@ import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.util.Connectables;
public class StandardProcessContext implements ProcessContext, ControllerServiceLookup {
@@ -185,6 +186,11 @@ public class StandardProcessContext implements ProcessContext, ControllerService
}
@Override
+ public boolean hasNonLoopConnection() {
+ return Connectables.hasNonLoopConnection(procNode);
+ }
+
+ @Override
public boolean hasConnection(Relationship relationship) {
Set<Connection> connections = procNode.getConnections(relationship);
return connections != null && !connections.isEmpty();
http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
index 27d1264..a3c2a5d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
@@ -118,6 +118,11 @@ public class StandardSchedulingContext implements SchedulingContext {
}
@Override
+ public boolean hasNonLoopConnection() {
+ return processContext.hasNonLoopConnection();
+ }
+
+ @Override
public boolean hasConnection(Relationship relationship) {
return processContext.hasConnection(relationship);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java
index c4d040b..5d74ebb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java
@@ -17,6 +17,7 @@
package org.apache.nifi.util;
import java.util.Collection;
+import java.util.List;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
@@ -53,4 +54,15 @@ public class Connectables {
return false;
}
+
+ public static boolean hasNonLoopConnection(final Connectable connectable) {
+ final List<Connection> connections = connectable.getIncomingConnections();
+ for (final Connection connection : connections) {
+ if (!connection.getSource().equals(connectable)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestContinuallyRunProcessorTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestContinuallyRunProcessorTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestContinuallyRunProcessorTask.java
new file mode 100644
index 0000000..174e5fb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestContinuallyRunProcessorTask.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.tasks;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestContinuallyRunProcessorTask {
+
+ @Test
+ public void testIsWorkToDo() {
+ System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
+
+ final ProcessorNode procNode = Mockito.mock(ProcessorNode.class);
+ Mockito.when(procNode.hasIncomingConnection()).thenReturn(false);
+
+ // There is work to do because there are no incoming connections.
+ assertTrue(ContinuallyRunProcessorTask.isWorkToDo(procNode));
+
+ // Test with only a single connection that is self-looping and empty
+ final Connection selfLoopingConnection = Mockito.mock(Connection.class);
+ when(selfLoopingConnection.getSource()).thenReturn(procNode);
+ when(selfLoopingConnection.getDestination()).thenReturn(procNode);
+
+ when(procNode.hasIncomingConnection()).thenReturn(true);
+ when(procNode.getIncomingConnections()).thenReturn(Collections.singletonList(selfLoopingConnection));
+ assertTrue(ContinuallyRunProcessorTask.isWorkToDo(procNode));
+
+ // Test with only a single connection that is self-looping and empty
+ final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+ when(flowFileQueue.isActiveQueueEmpty()).thenReturn(true);
+
+ final FlowFileQueue nonEmptyQueue = Mockito.mock(FlowFileQueue.class);
+ when(nonEmptyQueue.isActiveQueueEmpty()).thenReturn(false);
+
+ when(selfLoopingConnection.getFlowFileQueue()).thenReturn(nonEmptyQueue);
+ assertTrue(ContinuallyRunProcessorTask.isWorkToDo(procNode));
+
+
+ // Test with only a non-looping Connection that has no FlowFiles
+ final Connection emptyConnection = Mockito.mock(Connection.class);
+ when(emptyConnection.getSource()).thenReturn(Mockito.mock(ProcessorNode.class));
+ when(emptyConnection.getDestination()).thenReturn(procNode);
+
+ when(emptyConnection.getFlowFileQueue()).thenReturn(flowFileQueue);
+ when(procNode.getIncomingConnections()).thenReturn(Collections.singletonList(emptyConnection));
+ assertFalse(ContinuallyRunProcessorTask.isWorkToDo(procNode));
+
+ // test when the queue has data
+ final Connection nonEmptyConnection = Mockito.mock(Connection.class);
+ when(nonEmptyConnection.getSource()).thenReturn(Mockito.mock(ProcessorNode.class));
+ when(nonEmptyConnection.getDestination()).thenReturn(procNode);
+ when(nonEmptyConnection.getFlowFileQueue()).thenReturn(nonEmptyQueue);
+ when(procNode.getIncomingConnections()).thenReturn(Collections.singletonList(nonEmptyConnection));
+ assertTrue(ContinuallyRunProcessorTask.isWorkToDo(procNode));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index 452df42..9aa9d59 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -129,7 +129,11 @@ public class ExecuteSQL extends AbstractProcessor {
FlowFile incoming = null;
if (context.hasIncomingConnection()) {
incoming = session.get();
- if (incoming == null) {
+
+ // If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
+ // However, if we have no FlowFile and we have connections coming from other Processors, then
+ // we know that we should run only if we have a FlowFile.
+ if (incoming == null && context.hasNonLoopConnection()) {
return;
}
}