You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/04/23 14:37:32 UTC

nifi git commit: NIFI-5075: Do not execute Funnels with no outgoing connections

Repository: nifi
Updated Branches:
  refs/heads/master 279921194 -> 8e4aa6bf2


NIFI-5075: Do not execute Funnels with no outgoing connections

- Added dedicated conditions for Funnels
- Fixed stale Javadoc
- Stopped caching hasNonLoopConnection variable
- Grouped some conditions to isSourceComponent variable for better
readability

This closes #2634.

Signed-off-by: Mark Payne <ma...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8e4aa6bf
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8e4aa6bf
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8e4aa6bf

Branch: refs/heads/master
Commit: 8e4aa6bf22ec216824db49fdfaa684e4fec9cb6a
Parents: 2799211
Author: Koji Kawamura <ij...@apache.org>
Authored: Fri Apr 13 11:18:24 2018 +0900
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Apr 23 10:37:22 2018 -0400

----------------------------------------------------------------------
 .../nifi/controller/tasks/ConnectableTask.java  | 46 ++++++++--
 .../controller/tasks/TestConnectableTask.java   | 93 +++++++++++++++++---
 2 files changed, 120 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8e4aa6bf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
index b0ba446..1a98ee3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ScheduledState;
@@ -51,7 +52,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Continually runs a processor as long as the processor has work to do. {@link #call()} will return <code>true</code> if the processor should be yielded, <code>false</code> otherwise.
+ * Continually runs a <code>{@link Connectable}</code> component as long as the component has work to do.
+ * {@link #invoke()} ()} will return <code>{@link InvocationResult}</code> telling if the component should be yielded.
  */
 public class ConnectableTask {
 
@@ -64,7 +66,6 @@ public class ConnectableTask {
     private final ProcessContext processContext;
     private final FlowController flowController;
     private final int numRelationships;
-    private final boolean hasNonLoopConnection;
 
 
     public ConnectableTask(final SchedulingAgent schedulingAgent, final Connectable connectable,
@@ -76,7 +77,6 @@ public class ConnectableTask {
         this.scheduleState = scheduleState;
         this.numRelationships = connectable.getRelationships().size();
         this.flowController = flowController;
-        this.hasNonLoopConnection = Connectables.hasNonLoopConnection(connectable);
 
         final StateManager stateManager = new TaskTerminationAwareStateManager(flowController.getStateManagerProvider().getStateManager(connectable.getIdentifier()), scheduleState::isTerminated);
         if (connectable instanceof ProcessorNode) {
@@ -103,8 +103,40 @@ public class ConnectableTask {
         return connectable.getYieldExpiration() > System.currentTimeMillis();
     }
 
+    /**
+     * Make sure processor has work to do. This means that it meets one of these criteria:
+     * <ol>
+     * <li>It is a Funnel and has incoming FlowFiles from other components, and and at least one outgoing connection.</li>
+     * <li>It is a 'source' component, meaning:<ul>
+     *   <li>It is annotated with @TriggerWhenEmpty</li>
+     *   <li>It has no incoming connections</li>
+     *   <li>All incoming connections are self-loops</li>
+     * </ul></li>
+     * <li>It has data in incoming connections to process</li>
+     * </ol>
+     * @return true if there is work to do, otherwise false
+     */
     private boolean isWorkToDo() {
-        return connectable.isTriggerWhenEmpty() || !connectable.hasIncomingConnection() || !hasNonLoopConnection || Connectables.flowFilesQueued(connectable);
+        boolean hasNonLoopConnection = Connectables.hasNonLoopConnection(connectable);
+
+        if (connectable.getConnectableType() == ConnectableType.FUNNEL) {
+            // Handle Funnel as a special case because it will never be a 'source' component,
+            // and also its outgoing connections can not be terminated.
+            // Incoming FlowFiles from other components, and at least one outgoing connection are required.
+            return connectable.hasIncomingConnection()
+                    && hasNonLoopConnection
+                    && !connectable.getConnections().isEmpty()
+                    && Connectables.flowFilesQueued(connectable);
+        }
+
+        final boolean isSourceComponent = connectable.isTriggerWhenEmpty()
+                // No input connections
+                || !connectable.hasIncomingConnection()
+                // Every incoming connection loops back to itself, no inputs from other components
+                || !hasNonLoopConnection;
+
+        // If it is not a 'source' component, it requires a FlowFile to process.
+        return isSourceComponent || Connectables.flowFilesQueued(connectable);
     }
 
     private boolean isBackPressureEngaged() {
@@ -129,11 +161,7 @@ public class ConnectableTask {
             return InvocationResult.DO_NOT_YIELD;
         }
 
-        // Make sure processor has work to do. This means that it meets one of these criteria:
-        // * It is annotated with @TriggerWhenEmpty
-        // * It has data in an incoming Connection
-        // * It has no incoming connections
-        // * All incoming connections are self-loops
+        // Make sure processor has work to do.
         if (!isWorkToDo()) {
             return InvocationResult.yield("No work to do");
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e4aa6bf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java
index 3ff9580..7214b80 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java
@@ -21,12 +21,17 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.when;
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.queue.FlowFileQueue;
@@ -41,15 +46,9 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 public class TestConnectableTask {
-    @Test
-    public void testIsWorkToDo() {
-        final ProcessorNode procNode = Mockito.mock(ProcessorNode.class);
-        Mockito.when(procNode.hasIncomingConnection()).thenReturn(false);
 
-        final Processor processor = Mockito.mock(Processor.class);
-        Mockito.when(procNode.getIdentifier()).thenReturn("123");
-        Mockito.when(procNode.getRunnableComponent()).thenReturn(processor);
 
+    private ConnectableTask createTask(final Connectable connectable) {
         final FlowController flowController = Mockito.mock(FlowController.class);
         Mockito.when(flowController.getStateManagerProvider()).thenReturn(Mockito.mock(StateManagerProvider.class));
 
@@ -61,9 +60,22 @@ public class TestConnectableTask {
 
         final LifecycleState scheduleState = new LifecycleState();
         final StringEncryptor encryptor = Mockito.mock(StringEncryptor.class);
-        ConnectableTask task = new ConnectableTask(Mockito.mock(SchedulingAgent.class), procNode, flowController, contextFactory, scheduleState, encryptor);
+
+        return new ConnectableTask(Mockito.mock(SchedulingAgent.class), connectable,
+                flowController, contextFactory, scheduleState, encryptor);
+    }
+
+    @Test
+    public void testIsWorkToDo() {
+        final ProcessorNode procNode = Mockito.mock(ProcessorNode.class);
+        Mockito.when(procNode.hasIncomingConnection()).thenReturn(false);
+
+        final Processor processor = Mockito.mock(Processor.class);
+        Mockito.when(procNode.getIdentifier()).thenReturn("123");
+        Mockito.when(procNode.getRunnableComponent()).thenReturn(processor);
 
         // There is work to do because there are no incoming connections.
+        final ConnectableTask task = createTask(procNode);
         assertFalse(task.invoke().isYield());
 
         // Test with only a single connection that is self-looping and empty
@@ -93,8 +105,6 @@ public class TestConnectableTask {
         when(emptyConnection.getFlowFileQueue()).thenReturn(flowFileQueue);
         when(procNode.getIncomingConnections()).thenReturn(Collections.singletonList(emptyConnection));
 
-        // Create a new ConnectableTask because we want to have a different value for the 'hasNonLoopConnection' value, which is calculated in the task's constructor.
-        task = new ConnectableTask(Mockito.mock(SchedulingAgent.class), procNode, flowController, contextFactory, scheduleState, encryptor);
         assertTrue(task.invoke().isYield());
 
         // test when the queue has data
@@ -106,4 +116,67 @@ public class TestConnectableTask {
         assertFalse(task.invoke().isYield());
     }
 
+    @Test
+    public void testIsWorkToDoFunnels() {
+        final Funnel funnel = Mockito.mock(Funnel.class);
+        Mockito.when(funnel.hasIncomingConnection()).thenReturn(false);
+        Mockito.when(funnel.getRunnableComponent()).thenReturn(funnel);
+        Mockito.when(funnel.getConnectableType()).thenReturn(ConnectableType.FUNNEL);
+        Mockito.when(funnel.getIdentifier()).thenReturn("funnel-1");
+
+        final ConnectableTask task = createTask(funnel);
+        assertTrue("If there is no incoming connection, it should be yielded.", task.invoke().isYield());
+
+        // Test with only a single connection that is self-looping and empty.
+        // Actually, this self-loop input can not be created for Funnels using NiFi API because an outer layer check condition does not allow it.
+        // But test it anyways.
+        final Connection selfLoopingConnection = Mockito.mock(Connection.class);
+        when(selfLoopingConnection.getSource()).thenReturn(funnel);
+        when(selfLoopingConnection.getDestination()).thenReturn(funnel);
+
+        when(funnel.hasIncomingConnection()).thenReturn(true);
+        when(funnel.getIncomingConnections()).thenReturn(Collections.singletonList(selfLoopingConnection));
+
+        final FlowFileQueue emptyQueue = Mockito.mock(FlowFileQueue.class);
+        when(emptyQueue.isActiveQueueEmpty()).thenReturn(true);
+        when(selfLoopingConnection.getFlowFileQueue()).thenReturn(emptyQueue);
+
+        final Set<Connection> outgoingConnections = new HashSet<>();
+        outgoingConnections.add(selfLoopingConnection);
+        when(funnel.getConnections()).thenReturn(outgoingConnections);
+
+        assertTrue("If there is no incoming connection from other components, it should be yielded.", task.invoke().isYield());
+
+        // Add an incoming connection from another component.
+        final ProcessorNode inputProcessor = Mockito.mock(ProcessorNode.class);
+        final Connection incomingFromAnotherComponent = Mockito.mock(Connection.class);
+        when(incomingFromAnotherComponent.getSource()).thenReturn(inputProcessor);
+        when(incomingFromAnotherComponent.getDestination()).thenReturn(funnel);
+        when(incomingFromAnotherComponent.getFlowFileQueue()).thenReturn(emptyQueue);
+
+        when(funnel.hasIncomingConnection()).thenReturn(true);
+        when(funnel.getIncomingConnections()).thenReturn(Arrays.asList(selfLoopingConnection, incomingFromAnotherComponent));
+
+        assertTrue("Even if there is an incoming connection from another component," +
+                " it should be yielded because there's no outgoing connections.", task.invoke().isYield());
+
+        // Add an outgoing connection to another component.
+        final ProcessorNode outputProcessor = Mockito.mock(ProcessorNode.class);
+        final Connection outgoingToAnotherComponent = Mockito.mock(Connection.class);
+        when(outgoingToAnotherComponent.getSource()).thenReturn(funnel);
+        when(outgoingToAnotherComponent.getDestination()).thenReturn(outputProcessor);
+        outgoingConnections.add(outgoingToAnotherComponent);
+
+        assertTrue("Even if there is an incoming connection from another component and an outgoing connection as well," +
+                " it should be yielded because there's no incoming FlowFiles to process.", task.invoke().isYield());
+
+        // Adding input FlowFiles.
+        final FlowFileQueue nonEmptyQueue = Mockito.mock(FlowFileQueue.class);
+        when(nonEmptyQueue.isActiveQueueEmpty()).thenReturn(false);
+        when(incomingFromAnotherComponent.getFlowFileQueue()).thenReturn(nonEmptyQueue);
+        assertFalse("When a Funnel has both incoming and outgoing connections and FlowFiles to process, then it should be executed.",
+                task.invoke().isYield());
+
+    }
+
 }