You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bs...@apache.org on 2023/02/16 14:35:36 UTC

[nifi] branch support/nifi-1.x updated: NIFI-11189: When synchronizing a ProcessGroup to match a VersionedProcessGroup, do not remove the temporary funnel until the very end. This is important if the temporary funnel already exists in the flow on startup

This is an automated email from the ASF dual-hosted git repository.

bsimon pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 6f2b9e5f65 NIFI-11189: When synchronizing a ProcessGroup to match a VersionedProcessGroup, do not remove the temporary funnel until the very end. This is important if the temporary funnel already exists in the flow on startup
6f2b9e5f65 is described below

commit 6f2b9e5f65198b0eb3870d9ae268429803f31a3f
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Feb 15 18:03:10 2023 -0500

    NIFI-11189: When synchronizing a ProcessGroup to match a VersionedProcessGroup, do not remove the temporary funnel until the very end. This is important if the temporary funnel already exists in the flow on startup
    
    Signed-off-by: Bence Simon <bs...@apache.org>
    This closes #6963
---
 .../StandardVersionedComponentSynchronizer.java    | 134 +++++++++++----------
 1 file changed, 69 insertions(+), 65 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index 519089f048..388d1a28b4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -395,76 +395,80 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
         final Set<String> connectionsWithTempDestination = updateConnectionDestinations(group, proposed, connectionsByVersionedId);
 
         try {
-            final Map<String, Funnel> funnelsByVersionedId = componentsById(group, ProcessGroup::getFunnels);
-            final Map<String, ProcessorNode> processorsByVersionedId = componentsById(group, ProcessGroup::getProcessors);
-            final Map<String, Port> inputPortsByVersionedId = componentsById(group, ProcessGroup::getInputPorts);
-            final Map<String, Port> outputPortsByVersionedId = componentsById(group, ProcessGroup::getOutputPorts);
-            final Map<String, Label> labelsByVersionedId = componentsById(group, ProcessGroup::getLabels, Label::getIdentifier, Label::getVersionedComponentId);
-            final Map<String, RemoteProcessGroup> rpgsByVersionedId = componentsById(group, ProcessGroup::getRemoteProcessGroups,
-                RemoteProcessGroup::getIdentifier, RemoteProcessGroup::getVersionedComponentId);
-            final Map<String, ProcessGroup> childGroupsByVersionedId = componentsById(group, ProcessGroup::getProcessGroups, ProcessGroup::getIdentifier, ProcessGroup::getVersionedComponentId);
-
-            removeMissingProcessors(group, proposed, processorsByVersionedId);
-            removeMissingFunnels(group, proposed, funnelsByVersionedId);
-            removeMissingInputPorts(group, proposed, inputPortsByVersionedId);
-            removeMissingOutputPorts(group, proposed, outputPortsByVersionedId);
-            removeMissingLabels(group, proposed, labelsByVersionedId);
-            removeMissingRpg(group, proposed, rpgsByVersionedId);
-            removeMissingChildGroups(group, proposed, childGroupsByVersionedId);
-
-            // Synchronize Child Process Groups
-            synchronizeChildGroups(group, proposed, versionedParameterContexts, childGroupsByVersionedId, parameterProviderReferences, topLevelGroup);
-
-            synchronizeFunnels(group, proposed, funnelsByVersionedId);
-            synchronizeInputPorts(group, proposed, proposedPortFinalNames, inputPortsByVersionedId);
-            synchronizeOutputPorts(group, proposed, proposedPortFinalNames, outputPortsByVersionedId);
-            synchronizeLabels(group, proposed, labelsByVersionedId);
-            synchronizeProcessors(group, proposed, processorsByVersionedId, topLevelGroup);
-            synchronizeRemoteGroups(group, proposed, rpgsByVersionedId);
-        } finally {
-            // Make sure that we reset the connections
-            restoreConnectionDestinations(group, proposed, connectionsByVersionedId, connectionsWithTempDestination);
-            removeTemporaryFunnel(group);
-        }
-
-        Map<String, Parameter> newParameters = new HashMap<>();
-        if (!proposedParameterContextExistsBeforeSynchronize && this.context.getFlowMappingOptions().isMapControllerServiceReferencesToVersionedId()) {
-            Map<String, String> controllerServiceVersionedIdToId = group.getControllerServices(false)
-                .stream()
-                .filter(controllerServiceNode -> controllerServiceNode.getVersionedComponentId().isPresent())
-                .collect(Collectors.toMap(
-                    controllerServiceNode -> controllerServiceNode.getVersionedComponentId().get(),
-                    ComponentNode::getIdentifier
-                ));
-
-            ParameterContext parameterContext = group.getParameterContext();
-
-            if (parameterContext != null) {
-                parameterContext.getParameters().forEach((descriptor, parameter) -> {
-                    List<ParameterReferencedControllerServiceData> referencedControllerServiceData = parameterContext
-                        .getParameterReferenceManager()
-                        .getReferencedControllerServiceData(parameterContext, descriptor.getName());
-
-                    if (referencedControllerServiceData.isEmpty()) {
-                        newParameters.put(descriptor.getName(), parameter);
-                    } else {
-                        final Parameter adjustedParameter = new Parameter(parameter.getDescriptor(), controllerServiceVersionedIdToId.get(parameter.getValue()));
-                        newParameters.put(descriptor.getName(), adjustedParameter);
-                    }
-                });
+            try {
+                final Map<String, Funnel> funnelsByVersionedId = componentsById(group, ProcessGroup::getFunnels);
+                final Map<String, ProcessorNode> processorsByVersionedId = componentsById(group, ProcessGroup::getProcessors);
+                final Map<String, Port> inputPortsByVersionedId = componentsById(group, ProcessGroup::getInputPorts);
+                final Map<String, Port> outputPortsByVersionedId = componentsById(group, ProcessGroup::getOutputPorts);
+                final Map<String, Label> labelsByVersionedId = componentsById(group, ProcessGroup::getLabels, Label::getIdentifier, Label::getVersionedComponentId);
+                final Map<String, RemoteProcessGroup> rpgsByVersionedId = componentsById(group, ProcessGroup::getRemoteProcessGroups,
+                    RemoteProcessGroup::getIdentifier, RemoteProcessGroup::getVersionedComponentId);
+                final Map<String, ProcessGroup> childGroupsByVersionedId = componentsById(group, ProcessGroup::getProcessGroups, ProcessGroup::getIdentifier, ProcessGroup::getVersionedComponentId);
+
+                removeMissingProcessors(group, proposed, processorsByVersionedId);
+                removeMissingFunnels(group, proposed, funnelsByVersionedId);
+                removeMissingInputPorts(group, proposed, inputPortsByVersionedId);
+                removeMissingOutputPorts(group, proposed, outputPortsByVersionedId);
+                removeMissingLabels(group, proposed, labelsByVersionedId);
+                removeMissingRpg(group, proposed, rpgsByVersionedId);
+                removeMissingChildGroups(group, proposed, childGroupsByVersionedId);
+
+                // Synchronize Child Process Groups
+                synchronizeChildGroups(group, proposed, versionedParameterContexts, childGroupsByVersionedId, parameterProviderReferences, topLevelGroup);
+
+                synchronizeFunnels(group, proposed, funnelsByVersionedId);
+                synchronizeInputPorts(group, proposed, proposedPortFinalNames, inputPortsByVersionedId);
+                synchronizeOutputPorts(group, proposed, proposedPortFinalNames, outputPortsByVersionedId);
+                synchronizeLabels(group, proposed, labelsByVersionedId);
+                synchronizeProcessors(group, proposed, processorsByVersionedId, topLevelGroup);
+                synchronizeRemoteGroups(group, proposed, rpgsByVersionedId);
+            } finally {
+                // Make sure that we reset the connections
+                restoreConnectionDestinations(group, proposed, connectionsByVersionedId, connectionsWithTempDestination);
+            }
+
+            Map<String, Parameter> newParameters = new HashMap<>();
+            if (!proposedParameterContextExistsBeforeSynchronize && this.context.getFlowMappingOptions().isMapControllerServiceReferencesToVersionedId()) {
+                Map<String, String> controllerServiceVersionedIdToId = group.getControllerServices(false)
+                    .stream()
+                    .filter(controllerServiceNode -> controllerServiceNode.getVersionedComponentId().isPresent())
+                    .collect(Collectors.toMap(
+                        controllerServiceNode -> controllerServiceNode.getVersionedComponentId().get(),
+                        ComponentNode::getIdentifier
+                    ));
+
+                ParameterContext parameterContext = group.getParameterContext();
+
+                if (parameterContext != null) {
+                    parameterContext.getParameters().forEach((descriptor, parameter) -> {
+                        List<ParameterReferencedControllerServiceData> referencedControllerServiceData = parameterContext
+                            .getParameterReferenceManager()
+                            .getReferencedControllerServiceData(parameterContext, descriptor.getName());
+
+                        if (referencedControllerServiceData.isEmpty()) {
+                            newParameters.put(descriptor.getName(), parameter);
+                        } else {
+                            final Parameter adjustedParameter = new Parameter(parameter.getDescriptor(), controllerServiceVersionedIdToId.get(parameter.getValue()));
+                            newParameters.put(descriptor.getName(), adjustedParameter);
+                        }
+                    });
 
-                parameterContext.setParameters(newParameters);
+                    parameterContext.setParameters(newParameters);
+                }
             }
-        }
 
-        // We can now add in any necessary connections, since all connectable components have now been created.
-        synchronizeConnections(group, proposed, connectionsByVersionedId);
+            // We can now add in any necessary connections, since all connectable components have now been created.
+            synchronizeConnections(group, proposed, connectionsByVersionedId);
 
-        // All ports have now been added/removed as necessary. We can now resolve the port names.
-        updatePortsToFinalNames(proposedPortFinalNames);
+            // All ports have now been added/removed as necessary. We can now resolve the port names.
+            updatePortsToFinalNames(proposedPortFinalNames);
 
-        // Start all components that are queued up to be started now
-        context.getComponentScheduler().resume();
+            // Start all components that are queued up to be started now
+            context.getComponentScheduler().resume();
+        } finally {
+            // If we created a temporary funnel, remove it if there's no longer anything pointing to it.
+            removeTemporaryFunnel(group);
+        }
     }
 
     private String determineRegistryId(final VersionedFlowCoordinates coordinates) {