You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2021/12/05 11:40:14 UTC

[nifi] branch main updated: NIFI-9442: When deleting a connection, ensure that when the source of the connection is a funnel that its upstream components are checked (recursively)

This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a3f7da  NIFI-9442: When deleting a connection, ensure that when the source of the connection is a funnel that its upstream components are checked (recursively)
2a3f7da is described below

commit 2a3f7dafd657a8f1a2d638688889710df3ef8acd
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Dec 3 14:14:03 2021 -0500

    NIFI-9442: When deleting a connection, ensure that when the source of the connection is a funnel that its upstream components are checked (recursively)
    
    Signed-off-by: Joe Gresock <jg...@gmail.com>
    
    This closes #5568.
---
 .../nifi/connectable/StandardConnection.java       | 33 ++++++++++++++++++----
 1 file changed, 28 insertions(+), 5 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java
index 1cf78b6..a866c32 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -497,11 +497,16 @@ public final class StandardConnection implements Connection, ConnectionEventList
             throw new IllegalStateException("Queue not empty for " + this.getIdentifier());
         }
 
-        if (source.isRunning()) {
-            if (!ConnectableType.FUNNEL.equals(source.getConnectableType())) {
-                throw new IllegalStateException("Source of Connection (" + source.getIdentifier() + ") is running");
-            }
-        }
+        // The source must be stopped unless it is a Funnel. Funnels cannot be stopped & started. But if the source is a Funnel,
+        // it means that its sources must also be stopped, and the check must go on recursively.
+        // This is important for a case in which we have a cluster where two processors (for example) are connected with a funnel in between.
+        // In this case, if a user deletes the connection between the funnel and its destination, the web request that is made will be done in two
+        // phases: (1) Verify that the request is valid and (2) Delete the connection. But if we don't recursively ensure that the upstream components
+        // are stopped, we could have all nodes in the cluster verify the request is valid in the first phase. But before the second phase occurs, one
+        // node may now have data within the Connection, so the second phase (the delete) will fail. In that situation, the node's dataflow will differ
+        // from the rest of the cluster, and the node will be kicked out of the cluster. To avoid this, we simply ensure that the source is stopped,
+        // and if the source is a funnel (which can't be stopped) that its sources are stopped.
+        verifySourceStoppedOrFunnel(this);
 
         final Connectable dest = destination.get();
         if (dest.isRunning()) {
@@ -511,6 +516,24 @@ public final class StandardConnection implements Connection, ConnectionEventList
         }
     }
 
+    private void verifySourceStoppedOrFunnel(final Connection connection) {
+        final Connectable sourceComponent = connection.getSource();
+        if (!sourceComponent.isRunning()) {
+            return;
+        }
+
+        final ConnectableType connectableType = sourceComponent.getConnectableType();
+        if (connectableType != ConnectableType.FUNNEL) {
+            // Source is running and not a funnel. Source is not considered stopped.
+            throw new IllegalStateException("Upstream component of Connection (" + sourceComponent + ") is running");
+        }
+
+        // Source is a funnel and is running. We need to then check all of its upstream components.
+        for (final Connection incoming : source.getIncomingConnections()) {
+            verifySourceStoppedOrFunnel(incoming);
+        }
+    }
+
     @Override
     public Optional<String> getVersionedComponentId() {
         return Optional.ofNullable(versionedComponentId.get());