You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bs...@apache.org on 2023/02/16 13:19:08 UTC

[nifi] branch main updated: NIFI-11189: When synchronizing a ProcessGroup to match a VersionedProcessGroup, do not remove the temporary funnel until the very end. This is important if the temporary funnel already exists in the flow on startup

This is an automated email from the ASF dual-hosted git repository.

bsimon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new fed175d46f NIFI-11189: When synchronizing a ProcessGroup to match a VersionedProcessGroup, do not remove the temporary funnel until the very end. This is important if the temporary funnel already exists in the flow on startup
fed175d46f is described below

commit fed175d46f794cffae8695df6eb63adab4e8a27a
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Feb 15 18:03:10 2023 -0500

    NIFI-11189: When synchronizing a ProcessGroup to match a VersionedProcessGroup, do not remove the temporary funnel until the very end. This is important if the temporary funnel already exists in the flow on startup
    
    Signed-off-by: Bence Simon <bs...@apache.org>
    This closes #6963
---
 .../StandardVersionedComponentSynchronizer.java    | 134 +++++++++++----------
 1 file changed, 69 insertions(+), 65 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index 3401aa5acd..f7e64336ee 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -395,76 +395,80 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
         final Set<String> connectionsWithTempDestination = updateConnectionDestinations(group, proposed, connectionsByVersionedId);
 
         try {
-            final Map<String, Funnel> funnelsByVersionedId = componentsById(group, ProcessGroup::getFunnels);
-            final Map<String, ProcessorNode> processorsByVersionedId = componentsById(group, ProcessGroup::getProcessors);
-            final Map<String, Port> inputPortsByVersionedId = componentsById(group, ProcessGroup::getInputPorts);
-            final Map<String, Port> outputPortsByVersionedId = componentsById(group, ProcessGroup::getOutputPorts);
-            final Map<String, Label> labelsByVersionedId = componentsById(group, ProcessGroup::getLabels, Label::getIdentifier, Label::getVersionedComponentId);
-            final Map<String, RemoteProcessGroup> rpgsByVersionedId = componentsById(group, ProcessGroup::getRemoteProcessGroups,
-                RemoteProcessGroup::getIdentifier, RemoteProcessGroup::getVersionedComponentId);
-            final Map<String, ProcessGroup> childGroupsByVersionedId = componentsById(group, ProcessGroup::getProcessGroups, ProcessGroup::getIdentifier, ProcessGroup::getVersionedComponentId);
-
-            removeMissingProcessors(group, proposed, processorsByVersionedId);
-            removeMissingFunnels(group, proposed, funnelsByVersionedId);
-            removeMissingInputPorts(group, proposed, inputPortsByVersionedId);
-            removeMissingOutputPorts(group, proposed, outputPortsByVersionedId);
-            removeMissingLabels(group, proposed, labelsByVersionedId);
-            removeMissingRpg(group, proposed, rpgsByVersionedId);
-            removeMissingChildGroups(group, proposed, childGroupsByVersionedId);
-
-            // Synchronize Child Process Groups
-            synchronizeChildGroups(group, proposed, versionedParameterContexts, childGroupsByVersionedId, parameterProviderReferences, topLevelGroup);
-
-            synchronizeFunnels(group, proposed, funnelsByVersionedId);
-            synchronizeInputPorts(group, proposed, proposedPortFinalNames, inputPortsByVersionedId);
-            synchronizeOutputPorts(group, proposed, proposedPortFinalNames, outputPortsByVersionedId);
-            synchronizeLabels(group, proposed, labelsByVersionedId);
-            synchronizeProcessors(group, proposed, processorsByVersionedId, topLevelGroup);
-            synchronizeRemoteGroups(group, proposed, rpgsByVersionedId);
-        } finally {
-            // Make sure that we reset the connections
-            restoreConnectionDestinations(group, proposed, connectionsByVersionedId, connectionsWithTempDestination);
-            removeTemporaryFunnel(group);
-        }
-
-        Map<String, Parameter> newParameters = new HashMap<>();
-        if (!proposedParameterContextExistsBeforeSynchronize && this.context.getFlowMappingOptions().isMapControllerServiceReferencesToVersionedId()) {
-            Map<String, String> controllerServiceVersionedIdToId = group.getControllerServices(false)
-                .stream()
-                .filter(controllerServiceNode -> controllerServiceNode.getVersionedComponentId().isPresent())
-                .collect(Collectors.toMap(
-                    controllerServiceNode -> controllerServiceNode.getVersionedComponentId().get(),
-                    ComponentNode::getIdentifier
-                ));
-
-            ParameterContext parameterContext = group.getParameterContext();
-
-            if (parameterContext != null) {
-                parameterContext.getParameters().forEach((descriptor, parameter) -> {
-                    List<ParameterReferencedControllerServiceData> referencedControllerServiceData = parameterContext
-                        .getParameterReferenceManager()
-                        .getReferencedControllerServiceData(parameterContext, descriptor.getName());
-
-                    if (referencedControllerServiceData.isEmpty()) {
-                        newParameters.put(descriptor.getName(), parameter);
-                    } else {
-                        final Parameter adjustedParameter = new Parameter(parameter.getDescriptor(), controllerServiceVersionedIdToId.get(parameter.getValue()));
-                        newParameters.put(descriptor.getName(), adjustedParameter);
-                    }
-                });
+            try {
+                final Map<String, Funnel> funnelsByVersionedId = componentsById(group, ProcessGroup::getFunnels);
+                final Map<String, ProcessorNode> processorsByVersionedId = componentsById(group, ProcessGroup::getProcessors);
+                final Map<String, Port> inputPortsByVersionedId = componentsById(group, ProcessGroup::getInputPorts);
+                final Map<String, Port> outputPortsByVersionedId = componentsById(group, ProcessGroup::getOutputPorts);
+                final Map<String, Label> labelsByVersionedId = componentsById(group, ProcessGroup::getLabels, Label::getIdentifier, Label::getVersionedComponentId);
+                final Map<String, RemoteProcessGroup> rpgsByVersionedId = componentsById(group, ProcessGroup::getRemoteProcessGroups,
+                    RemoteProcessGroup::getIdentifier, RemoteProcessGroup::getVersionedComponentId);
+                final Map<String, ProcessGroup> childGroupsByVersionedId = componentsById(group, ProcessGroup::getProcessGroups, ProcessGroup::getIdentifier, ProcessGroup::getVersionedComponentId);
+
+                removeMissingProcessors(group, proposed, processorsByVersionedId);
+                removeMissingFunnels(group, proposed, funnelsByVersionedId);
+                removeMissingInputPorts(group, proposed, inputPortsByVersionedId);
+                removeMissingOutputPorts(group, proposed, outputPortsByVersionedId);
+                removeMissingLabels(group, proposed, labelsByVersionedId);
+                removeMissingRpg(group, proposed, rpgsByVersionedId);
+                removeMissingChildGroups(group, proposed, childGroupsByVersionedId);
+
+                // Synchronize Child Process Groups
+                synchronizeChildGroups(group, proposed, versionedParameterContexts, childGroupsByVersionedId, parameterProviderReferences, topLevelGroup);
+
+                synchronizeFunnels(group, proposed, funnelsByVersionedId);
+                synchronizeInputPorts(group, proposed, proposedPortFinalNames, inputPortsByVersionedId);
+                synchronizeOutputPorts(group, proposed, proposedPortFinalNames, outputPortsByVersionedId);
+                synchronizeLabels(group, proposed, labelsByVersionedId);
+                synchronizeProcessors(group, proposed, processorsByVersionedId, topLevelGroup);
+                synchronizeRemoteGroups(group, proposed, rpgsByVersionedId);
+            } finally {
+                // Make sure that we reset the connections
+                restoreConnectionDestinations(group, proposed, connectionsByVersionedId, connectionsWithTempDestination);
+            }
+
+            Map<String, Parameter> newParameters = new HashMap<>();
+            if (!proposedParameterContextExistsBeforeSynchronize && this.context.getFlowMappingOptions().isMapControllerServiceReferencesToVersionedId()) {
+                Map<String, String> controllerServiceVersionedIdToId = group.getControllerServices(false)
+                    .stream()
+                    .filter(controllerServiceNode -> controllerServiceNode.getVersionedComponentId().isPresent())
+                    .collect(Collectors.toMap(
+                        controllerServiceNode -> controllerServiceNode.getVersionedComponentId().get(),
+                        ComponentNode::getIdentifier
+                    ));
+
+                ParameterContext parameterContext = group.getParameterContext();
+
+                if (parameterContext != null) {
+                    parameterContext.getParameters().forEach((descriptor, parameter) -> {
+                        List<ParameterReferencedControllerServiceData> referencedControllerServiceData = parameterContext
+                            .getParameterReferenceManager()
+                            .getReferencedControllerServiceData(parameterContext, descriptor.getName());
+
+                        if (referencedControllerServiceData.isEmpty()) {
+                            newParameters.put(descriptor.getName(), parameter);
+                        } else {
+                            final Parameter adjustedParameter = new Parameter(parameter.getDescriptor(), controllerServiceVersionedIdToId.get(parameter.getValue()));
+                            newParameters.put(descriptor.getName(), adjustedParameter);
+                        }
+                    });
 
-                parameterContext.setParameters(newParameters);
+                    parameterContext.setParameters(newParameters);
+                }
             }
-        }
 
-        // We can now add in any necessary connections, since all connectable components have now been created.
-        synchronizeConnections(group, proposed, connectionsByVersionedId);
+            // We can now add in any necessary connections, since all connectable components have now been created.
+            synchronizeConnections(group, proposed, connectionsByVersionedId);
 
-        // All ports have now been added/removed as necessary. We can now resolve the port names.
-        updatePortsToFinalNames(proposedPortFinalNames);
+            // All ports have now been added/removed as necessary. We can now resolve the port names.
+            updatePortsToFinalNames(proposedPortFinalNames);
 
-        // Start all components that are queued up to be started now
-        context.getComponentScheduler().resume();
+            // Start all components that are queued up to be started now
+            context.getComponentScheduler().resume();
+        } finally {
+            // If we created a temporary funnel, remove it if there's no longer anything pointing to it.
+            removeTemporaryFunnel(group);
+        }
     }
 
     private String determineRegistryId(final VersionedFlowCoordinates coordinates) {