You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/03/17 12:48:10 UTC

[nifi] 34/47: NIFI-7241: When updating Process Group to match VersionedProcessGroup, remove any connections before recursing into child groups. This ensures that if a Port exists in child group A and is connected to a port in child group B, if the VersionedProcessGroup indicates to remove the port, that connection will be removed before attempting to remove the port. Updating and adding connections must still be done last, after all components have been added. But missing connections can be removed earl [...]

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.11.x
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit eeb7d698101a38731bee4d857cd24a4fb1bb5561
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Mar 11 16:48:49 2020 -0400

    NIFI-7241: When updating Process Group to match VersionedProcessGroup, remove any connections before recursing into child groups. This ensures that if a Port exists in child group A and is connected to a port in child group B, if the VersionedProcessGroup indicates to remove the port, that connection will be removed before attempting to remove the port. Updating and adding connections must still be done last, after all components have been added. But missing connections can be removed [...]
    joewitt Added Mark Payne provided update since this change was dependent on NIFI-6873 so it would work in nifi 1.11.4
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4136.
---
 .../apache/nifi/groups/StandardProcessGroup.java   | 40 +++++++++++++---------
 1 file changed, 23 insertions(+), 17 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 7cdb5b7..8364d95 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -3803,7 +3803,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             ControllerServiceNode service = servicesByVersionedId.get(proposedService.getIdentifier());
             if (service == null) {
                 service = addControllerService(group, proposedService, componentIdSeed);
-                LOG.info("Added {} to {}", service, this);
+                LOG.info("Added {} to {}", service, group);
                 servicesAdded.put(proposedService.getIdentifier(), service);
             }
 
@@ -3835,6 +3835,27 @@ public final class StandardProcessGroup implements ProcessGroup {
             controllerServicesRemoved.remove(proposedService.getIdentifier());
         }
 
+        // Before we can update child groups, we must first remove any connections that are connected to those child groups' input/output ports.
+        // We cannot add or update connections yet, though. That must be done at the end, as it's possible that the component that is the source/destination of the connection
+        // has not yet been added.
+        final Map<String, Connection> connectionsByVersionedId = group.getConnections().stream()
+                .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
+        final Set<String> connectionsRemoved = new HashSet<>(connectionsByVersionedId.keySet());
+
+        for (final VersionedConnection proposedConnection : proposed.getConnections()) {
+            connectionsRemoved.remove(proposedConnection.getIdentifier());
+        }
+
+        // Connections must be the first thing to remove, not the last. Otherwise, we will fail
+        // to remove a component if it has a connection going to it!
+        for (final String removedVersionedId : connectionsRemoved) {
+            final Connection connection = connectionsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", connection, group);
+            group.removeConnection(connection);
+            flowManager.onConnectionRemoved(connection);
+        }
+
+
         // Child groups
         final Map<String, ProcessGroup> childGroupsByVersionedId = group.getProcessGroups().stream()
                 .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
@@ -4019,11 +4040,7 @@ public final class StandardProcessGroup implements ProcessGroup {
         }
 
 
-        // Connections
-        final Map<String, Connection> connectionsByVersionedId = group.getConnections().stream()
-                .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
-        final Set<String> connectionsRemoved = new HashSet<>(connectionsByVersionedId.keySet());
-
+        // Add and update Connections
         for (final VersionedConnection proposedConnection : proposed.getConnections()) {
             final Connection connection = connectionsByVersionedId.get(proposedConnection.getIdentifier());
             if (connection == null) {
@@ -4037,21 +4054,10 @@ public final class StandardProcessGroup implements ProcessGroup {
                 updateConnection(connection, proposedConnection);
                 LOG.info("Updated {}", connection);
             }
-
-            connectionsRemoved.remove(proposedConnection.getIdentifier());
         }
 
         // Remove components that exist in the local flow but not the remote flow.
 
-        // Connections must be the first thing to remove, not the last. Otherwise, we will fail
-        // to remove a component if it has a connection going to it!
-        for (final String removedVersionedId : connectionsRemoved) {
-            final Connection connection = connectionsByVersionedId.get(removedVersionedId);
-            LOG.info("Removing {} from {}", connection, group);
-            group.removeConnection(connection);
-            flowManager.onConnectionRemoved(connection);
-        }
-
         // Once the appropriate connections have been removed, we may now update Processors' auto-terminated relationships.
         // We cannot do this above, in the 'updateProcessor' call because if a connection is removed and changed to auto-terminated,
         // then updating this in the updateProcessor call above would attempt to set the Relationship to being auto-terminated while a