You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2015/11/18 21:41:26 UTC

nifi git commit: NIFI-1168: Ensure that processors with only looping connections are scheduled to run, even if the connections have no FlowFiles; expose these details to processor developers; update documentation

Repository: nifi
Updated Branches:
  refs/heads/master 773576e04 -> 69bce2c2d


NIFI-1168: Ensure that processors with only looping
 connections are scheduled to run, even if the connections have no FlowFiles;
 expose these details to processor developers; update documentation

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/69bce2c2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/69bce2c2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/69bce2c2

Branch: refs/heads/master
Commit: 69bce2c2db2619e0323434228c31f98630e43372
Parents: 773576e
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Nov 18 14:53:30 2015 -0500
Committer: Aldrin Piri <al...@apache.org>
Committed: Wed Nov 18 14:53:30 2015 -0500

----------------------------------------------------------------------
 .../annotation/behavior/TriggerWhenEmpty.java   | 26 +++++--
 .../apache/nifi/processor/ProcessContext.java   | 10 ++-
 .../apache/nifi/util/MockProcessContext.java    | 12 ++-
 .../nifi/util/StandardProcessorTestRunner.java  |  5 ++
 .../java/org/apache/nifi/util/TestRunner.java   | 10 ++-
 .../documentation/mock/MockProcessContext.java  |  5 ++
 .../scheduling/ConnectableProcessContext.java   |  6 ++
 .../tasks/ContinuallyRunProcessorTask.java      | 46 +++++++----
 .../nifi/processor/StandardProcessContext.java  |  6 ++
 .../processor/StandardSchedulingContext.java    |  5 ++
 .../java/org/apache/nifi/util/Connectables.java | 12 +++
 .../tasks/TestContinuallyRunProcessorTask.java  | 82 ++++++++++++++++++++
 .../nifi/processors/standard/ExecuteSQL.java    |  6 +-
 13 files changed, 207 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenEmpty.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenEmpty.java b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenEmpty.java
index f442b80..6fac03b 100644
--- a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenEmpty.java
+++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenEmpty.java
@@ -24,14 +24,30 @@ import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
 /**
+ * <p>
  * Marker annotation a {@link org.apache.nifi.processor.Processor Processor}
  * implementation can use to indicate that the Processor should still be
- * triggered even when it has no data in its work queue. By default, Processors
- * which have no non-self incoming edges will be triggered even if there is no
- * work in its queue. However, Processors that have non-self incoming edges will
- * only be triggered if they have work in their queue or they present this
- * annotation.
+ * triggered even when it has no data in its work queue.
+ * </p>
  *
+ * <p>
+ * A Processor is scheduled to be triggered based on its configured Scheduling Period
+ * and Scheduling Strategy. However, when the scheduling period elapses, the Processor
+ * will not be scheduled if it has no work to do. Normally, a Processor is said to have
+ * work to do if one of the following circumstances is true:
+ * </p>
+ *
+ * <ul>
+ * <li>An incoming Connection has data in its queue</li>
+ * <li>The Processor has no incoming Connections.</li>
+ * <li>All incoming Connections are self-loops (both the source and destination of the Connection are the same Processor).
+ * </ul>
+ *
+ * <p>
+ * If the Processor needs to be triggered to run even when the above conditions are all
+ * <code>false</code>, the Processor's class can be annotated with this annotation, which
+ * will cause the Processor to be triggered, even if its incoming queues are empty.
+ * </p>
  */
 @Documented
 @Target({ElementType.TYPE})

http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
index bb69c65..7488b2d 100644
--- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
+++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
@@ -133,9 +133,17 @@ public interface ProcessContext {
     boolean hasIncomingConnection();
 
     /**
+     * @return <code>true</code> if the processor has one or more incoming connections for
+     *         which the source of the connection is NOT the processor; returns <code>false</code> if
+     *         the processor has no incoming connections or if all incoming connections are self-loops
+     *         (i.e., the processor is also the source of all incoming connections).
+     */
+    boolean hasNonLoopConnection();
+
+    /**
      * @param relationship a relationship to check for connections
      * @return true if the relationship has one or more outbound connections,
-     * false otherwise
+     *         false otherwise
      */
     boolean hasConnection(Relationship relationship);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
index a350ecb..49021d1 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
@@ -48,7 +48,8 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
     private boolean yieldCalled = false;
     private boolean enableExpressionValidation = false;
     private boolean allowExpressionValidation = true;
-    private boolean incomingConnection = true;
+    private volatile boolean incomingConnection = true;
+    private volatile boolean nonLoopConnection = true;
 
     private volatile Set<Relationship> connections = new HashSet<>();
     private volatile Set<Relationship> unavailableRelationships = new HashSet<>();
@@ -305,6 +306,15 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
         return this.connections.contains(relationship);
     }
 
+    public void setNonLoopConnection(final boolean hasNonLoopConnection) {
+        this.nonLoopConnection = hasNonLoopConnection;
+    }
+
+    @Override
+    public boolean hasNonLoopConnection() {
+        return nonLoopConnection;
+    }
+
     public void addConnection(final Relationship relationship) {
         this.connections.add(relationship);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 0d00cc8..2f384ba 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -524,6 +524,11 @@ public class StandardProcessorTestRunner implements TestRunner {
     }
 
     @Override
+    public void setNonLoopConnection(final boolean hasNonLoopConnection) {
+        context.setNonLoopConnection(hasNonLoopConnection);
+    }
+
+    @Override
     public void addConnection(Relationship relationship) {
         context.addConnection(relationship);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index ec901fe..b1e7c8c 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -492,11 +492,19 @@ public interface TestRunner {
      * Indicates to the framework that the configured processor has one or more
      * incoming connections.
      *
-     * @param hasIncomingConnection whether or not the configured processor has an incoming connection
+     * @param hasIncomingConnection whether or not the configured processor should behave as though it has an incoming connection
      */
     void setIncomingConnection(boolean hasIncomingConnection);
 
     /**
+     * Indicates to the framework that the configured processor has one or more incoming
+     * connections for which the processor is not also the source.
+     *
+     * @param hasNonLoopConnection whether or not the configured processor should behave as though it has a non-looping incoming connection
+     */
+    void setNonLoopConnection(boolean hasNonLoopConnection);
+
+    /**
      * Indicates to the Framework that the configured processor has a connection for the given Relationship.
      *
      * @param relationship that has a connection

http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java
index 3238d4a..38c9fc9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java
@@ -89,6 +89,11 @@ public class MockProcessContext implements ProcessContext {
     }
 
     @Override
+    public boolean hasNonLoopConnection() {
+        return true;
+    }
+
+    @Override
     public boolean hasConnection(Relationship relationship) {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
index da7162e..08e2504 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
@@ -37,6 +37,7 @@ import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.Connectables;
 
 /**
  * This class is essentially an empty shell for {@link Connectable}s that are not Processors
@@ -197,6 +198,11 @@ public class ConnectableProcessContext implements ProcessContext {
     }
 
     @Override
+    public boolean hasNonLoopConnection() {
+        return Connectables.hasNonLoopConnection(connectable);
+    }
+
+    @Override
     public boolean hasConnection(Relationship relationship) {
         Set<Connection> connections = connectable.getConnections(relationship);
         return connections != null && !connections.isEmpty();

http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
index d4d595b..dd12824 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
@@ -72,30 +72,45 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
         this.processContext = processContext;
     }
 
+    static boolean isRunOnCluster(final ProcessorNode procNode, final boolean isClustered, final boolean isPrimary) {
+        return !procNode.isIsolated() || !isClustered || isPrimary;
+    }
+
+    static boolean isYielded(final ProcessorNode procNode) {
+        return procNode.getYieldExpiration() >= System.currentTimeMillis();
+    }
+
+    static boolean isWorkToDo(final ProcessorNode procNode) {
+        return procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || !Connectables.hasNonLoopConnection(procNode) || Connectables.flowFilesQueued(procNode);
+    }
+
     @Override
     @SuppressWarnings("deprecation")
     public Boolean call() {
         // make sure processor is not yielded
-        boolean shouldRun = (procNode.getYieldExpiration() < System.currentTimeMillis());
-        if (!shouldRun) {
+        if (isYielded(procNode)) {
             return false;
         }
 
         // make sure that either we're not clustered or this processor runs on all nodes or that this is the primary node
-        shouldRun = !procNode.isIsolated() || !flowController.isClustered() || flowController.isPrimary();
-        if (!shouldRun) {
+        if (!isRunOnCluster(procNode, flowController.isClustered(), flowController.isPrimary())) {
             return false;
         }
 
-        // make sure that either proc has incoming FlowFiles or has no incoming connections or is annotated with @TriggerWhenEmpty
-        shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode);
-        if (!shouldRun) {
+        // Make sure processor has work to do. This means that it meets one of these criteria:
+        // * It is annotated with @TriggerWhenEmpty
+        // * It has data in an incoming Connection
+        // * It has no incoming connections
+        // * All incoming connections are self-loops
+        if (!isWorkToDo(procNode)) {
             return true;
         }
 
         if (numRelationships > 0) {
             final int requiredNumberOfAvailableRelationships = procNode.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;
-            shouldRun = context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships);
+            if (!context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships)) {
+                return false;
+            }
         }
 
         final long batchNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS);
@@ -112,10 +127,6 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
             batch = false;
         }
 
-        if (!shouldRun) {
-            return false;
-        }
-
         scheduleState.incrementActiveThreadCount();
 
         final long startNanos = System.nanoTime();
@@ -123,6 +134,7 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
         int invocationCount = 0;
         try {
             try (final AutoCloseable ncl = NarCloseable.withNarLoader()) {
+                boolean shouldRun = true;
                 while (shouldRun) {
                     procNode.onTrigger(processContext, sessionFactory);
                     invocationCount++;
@@ -135,10 +147,14 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
                         return false;
                     }
 
-                    shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode);
-                    shouldRun = shouldRun && (procNode.getYieldExpiration() < System.currentTimeMillis());
+                    if (!isWorkToDo(procNode)) {
+                        break;
+                    }
+                    if (isYielded(procNode)) {
+                        break;
+                    }
 
-                    if (shouldRun && numRelationships > 0) {
+                    if (numRelationships > 0) {
                         final int requiredNumberOfAvailableRelationships = procNode.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;
                         shouldRun = context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships);
                     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
index 76849bd..a6302ca 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
@@ -32,6 +32,7 @@ import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.util.Connectables;
 
 public class StandardProcessContext implements ProcessContext, ControllerServiceLookup {
 
@@ -185,6 +186,11 @@ public class StandardProcessContext implements ProcessContext, ControllerService
     }
 
     @Override
+    public boolean hasNonLoopConnection() {
+        return Connectables.hasNonLoopConnection(procNode);
+    }
+
+    @Override
     public boolean hasConnection(Relationship relationship) {
         Set<Connection> connections = procNode.getConnections(relationship);
         return connections != null && !connections.isEmpty();

http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
index 27d1264..a3c2a5d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
@@ -118,6 +118,11 @@ public class StandardSchedulingContext implements SchedulingContext {
     }
 
     @Override
+    public boolean hasNonLoopConnection() {
+        return processContext.hasNonLoopConnection();
+    }
+
+    @Override
     public boolean hasConnection(Relationship relationship) {
         return processContext.hasConnection(relationship);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java
index c4d040b..5d74ebb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.util;
 
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Connection;
@@ -53,4 +54,15 @@ public class Connectables {
 
         return false;
     }
+
+    public static boolean hasNonLoopConnection(final Connectable connectable) {
+        final List<Connection> connections = connectable.getIncomingConnections();
+        for (final Connection connection : connections) {
+            if (!connection.getSource().equals(connectable)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestContinuallyRunProcessorTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestContinuallyRunProcessorTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestContinuallyRunProcessorTask.java
new file mode 100644
index 0000000..174e5fb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestContinuallyRunProcessorTask.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.tasks;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestContinuallyRunProcessorTask {
+
+    @Test
+    public void testIsWorkToDo() {
+        System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
+
+        final ProcessorNode procNode = Mockito.mock(ProcessorNode.class);
+        Mockito.when(procNode.hasIncomingConnection()).thenReturn(false);
+
+        // There is work to do because there are no incoming connections.
+        assertTrue(ContinuallyRunProcessorTask.isWorkToDo(procNode));
+
+        // Test with only a single connection that is self-looping and empty
+        final Connection selfLoopingConnection = Mockito.mock(Connection.class);
+        when(selfLoopingConnection.getSource()).thenReturn(procNode);
+        when(selfLoopingConnection.getDestination()).thenReturn(procNode);
+
+        when(procNode.hasIncomingConnection()).thenReturn(true);
+        when(procNode.getIncomingConnections()).thenReturn(Collections.singletonList(selfLoopingConnection));
+        assertTrue(ContinuallyRunProcessorTask.isWorkToDo(procNode));
+
+        // Test with only a single connection that is self-looping and empty
+        final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+        when(flowFileQueue.isActiveQueueEmpty()).thenReturn(true);
+
+        final FlowFileQueue nonEmptyQueue = Mockito.mock(FlowFileQueue.class);
+        when(nonEmptyQueue.isActiveQueueEmpty()).thenReturn(false);
+
+        when(selfLoopingConnection.getFlowFileQueue()).thenReturn(nonEmptyQueue);
+        assertTrue(ContinuallyRunProcessorTask.isWorkToDo(procNode));
+
+
+        // Test with only a non-looping Connection that has no FlowFiles
+        final Connection emptyConnection = Mockito.mock(Connection.class);
+        when(emptyConnection.getSource()).thenReturn(Mockito.mock(ProcessorNode.class));
+        when(emptyConnection.getDestination()).thenReturn(procNode);
+
+        when(emptyConnection.getFlowFileQueue()).thenReturn(flowFileQueue);
+        when(procNode.getIncomingConnections()).thenReturn(Collections.singletonList(emptyConnection));
+        assertFalse(ContinuallyRunProcessorTask.isWorkToDo(procNode));
+
+        // test when the queue has data
+        final Connection nonEmptyConnection = Mockito.mock(Connection.class);
+        when(nonEmptyConnection.getSource()).thenReturn(Mockito.mock(ProcessorNode.class));
+        when(nonEmptyConnection.getDestination()).thenReturn(procNode);
+        when(nonEmptyConnection.getFlowFileQueue()).thenReturn(nonEmptyQueue);
+        when(procNode.getIncomingConnections()).thenReturn(Collections.singletonList(nonEmptyConnection));
+        assertTrue(ContinuallyRunProcessorTask.isWorkToDo(procNode));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/69bce2c2/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index 452df42..9aa9d59 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -129,7 +129,11 @@ public class ExecuteSQL extends AbstractProcessor {
         FlowFile incoming = null;
         if (context.hasIncomingConnection()) {
             incoming = session.get();
-            if (incoming == null) {
+
+            // If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
+            // However, if we have no FlowFile and we have connections coming from other Processors, then
+            // we know that we should run only if we have a FlowFile.
+            if (incoming == null && context.hasNonLoopConnection()) {
                 return;
             }
         }