You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/04/08 14:08:42 UTC

[nifi] branch main updated: NIFI-9875 In StandardProcessGroupSynchronizer.updateConnectionDestinations handle special case when output port is moved to a child process group while kept connected. (#5931)

This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 1cf4e72084 NIFI-9875 In StandardProcessGroupSynchronizer.updateConnectionDestinations handle special case when output port is moved to a child process group while kept connected. (#5931)
1cf4e72084 is described below

commit 1cf4e72084f30707ffb0676f77355a810c2d6e9e
Author: tpalfy <53...@users.noreply.github.com>
AuthorDate: Fri Apr 8 16:08:31 2022 +0200

    NIFI-9875 In StandardProcessGroupSynchronizer.updateConnectionDestinations handle special case when output port is moved to a child process group while kept connected. (#5931)
---
 .../groups/StandardProcessGroupSynchronizer.java   |  10 +-
 .../nifi/integration/versioned/ImportFlowIT.java   | 281 +++++++--------------
 2 files changed, 103 insertions(+), 188 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
index cefaf22ada..8c5592dc18 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
@@ -250,7 +250,9 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
 
         // Ensure that we create all Parameter Contexts before updating them. This is necessary in case the proposed incoming dataflow has
         // parameter contexts that inherit from one another and neither the inheriting nor inherited parameter context exists.
-        versionedParameterContexts.values().forEach(this::createParameterContextWithoutReferences);
+        if (versionedParameterContexts != null) {
+            versionedParameterContexts.values().forEach(this::createParameterContextWithoutReferences);
+        }
 
         updateParameterContext(group, proposed, versionedParameterContexts, context.getComponentIdGenerator());
         updateVariableRegistry(group, proposed);
@@ -559,7 +561,11 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
             // Find the destination of the connection. If the destination doesn't yet exist (because it's part of the proposed Process Group but not yet added),
             // we will set the destination to a temporary destination. Then, after adding components, we will update the destinations again.
             Connectable newDestination = getConnectable(group, proposedConnection.getDestination());
-            if (newDestination == null) {
+            if (
+                newDestination == null
+                ||
+                (newDestination.getConnectableType() == ConnectableType.OUTPUT_PORT && !newDestination.getProcessGroup().equals(connection.getProcessGroup()))
+            ) {
                 final Funnel temporaryDestination = getTemporaryFunnel(connection.getProcessGroup());
                 LOG.debug("Updated Connection {} to have a temporary destination of {}", connection, temporaryDestination);
                 newDestination = temporaryDestination;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
index 92f711e87a..2c82165e67 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
@@ -27,16 +27,9 @@ import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.StandardSnippet;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.flow.Bundle;
-import org.apache.nifi.flow.VersionedConnection;
-import org.apache.nifi.flow.VersionedControllerService;
 import org.apache.nifi.flow.VersionedExternalFlow;
-import org.apache.nifi.flow.VersionedExternalFlowMetadata;
-import org.apache.nifi.flow.VersionedFunnel;
-import org.apache.nifi.flow.VersionedParameter;
 import org.apache.nifi.flow.VersionedParameterContext;
-import org.apache.nifi.flow.VersionedPort;
 import org.apache.nifi.flow.VersionedProcessGroup;
-import org.apache.nifi.flow.VersionedProcessor;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.integration.DirectInjectionExtensionManager;
 import org.apache.nifi.integration.FrameworkIntegrationTest;
@@ -61,12 +54,13 @@ import org.apache.nifi.registry.flow.diff.FlowComparison;
 import org.apache.nifi.registry.flow.diff.FlowDifference;
 import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
 import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
 import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
 import org.apache.nifi.util.FlowDifferenceFilters;
 import org.jetbrains.annotations.NotNull;
 import org.junit.Test;
 
-import java.util.ArrayList;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -85,6 +79,14 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.AssertJUnit.assertNull;
 
 public class ImportFlowIT extends FrameworkIntegrationTest {
+    @Override
+    public void setup() throws IOException {
+        super.setup();
+
+        for (ProcessorNode processor : getRootGroup().getProcessors()) {
+            getRootGroup().removeProcessor(processor);
+        }
+    }
 
     @Override
     protected void injectExtensionTypes(final DirectInjectionExtensionManager extensionManager) {
@@ -103,7 +105,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         processor.setAutoTerminatedRelationships(Collections.singleton(REL_SUCCESS));
         processor.setProperties(Collections.singletonMap(NopServiceReferencingProcessor.SERVICE.getName(), controllerService.getIdentifier()));
 
-        final VersionedExternalFlow proposedFlow = createFlowSnapshot(Collections.singletonList(controllerService), Collections.singletonList(processor), null);
+        final VersionedExternalFlow proposedFlow = createFlowSnapshot();
 
         // Create an Inner Process Group and update it to match the Versioned Flow.
         final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -147,7 +149,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "password"));
 
         // Create a VersionedExternalFlow that contains the processor
-        final VersionedExternalFlow versionedFlowWithExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), null);
+        final VersionedExternalFlow versionedFlowWithExplicitValue = createFlowSnapshot();
 
         // Create child group
         final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -173,7 +175,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         assertEquals(DifferenceType.PROPERTY_PARAMETERIZED, differences.iterator().next().getDifferenceType());
 
         // Create a Versioned Flow that contains the Parameter Reference.
-        final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), null);
+        final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(innerGroup);
 
         // Ensure no difference between the current configuration and the versioned flow
         differences = getLocalModifications(innerGroup, versionedFlowWithParameterReference);
@@ -193,7 +195,8 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
 
         // Create a VersionedExternalFlow that contains the processor
         final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null);
-        final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), Collections.singleton(parameter));
+        setParameter(parameter);
+        final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot();
 
         // Create child group
         final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -225,7 +228,8 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
 
         // Create a VersionedExternalFlow that contains the processor
         final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null);
-        final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), Collections.singleton(parameter));
+        setParameter(parameter);
+        final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot();
 
         // Create child group
         final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -255,13 +259,12 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
 
         // Create a VersionedExternalFlow that contains the processor
         final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null);
-        final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(),
-            Collections.singletonList(initialProcessor), Collections.singleton(parameter));
-
+        setParameter(parameter);
+        final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot();
 
         // Update processor to have an explicit value for the second version of the flow.
         initialProcessor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "secret-value"));
-        final VersionedExternalFlow versionedFlowExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
+        final VersionedExternalFlow versionedFlowExplicitValue = createFlowSnapshot();
 
         // Create child group and update to the first version of the flow, with parameter ref
         final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -290,7 +293,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         initialProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "pass");
         initialProcessor.setProperties(initialProperties);
 
-        final VersionedExternalFlow initialVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
+        final VersionedExternalFlow initialVersionSnapshot = createFlowSnapshot();
 
         // Update processor to have a different explicit value for both sensitive and non-sensitive properties and create a versioned flow for it.
         final Map<String, String> updatedProperties = new HashMap<>();
@@ -298,7 +301,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         updatedProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "pass");
         initialProcessor.setProperties(updatedProperties);
 
-        final VersionedExternalFlow updatedVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
+        final VersionedExternalFlow updatedVersionSnapshot = createFlowSnapshot();
 
         // Create child group and update to the first version of the flow, with parameter ref
         final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -335,7 +338,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         initialProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}");
         initialProcessor.setProperties(initialProperties);
 
-        final VersionedExternalFlow initialVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
+        final VersionedExternalFlow initialVersionSnapshot = createFlowSnapshot();
 
         // Update processor to have a different explicit value for both sensitive and non-sensitive properties and create a versioned flow for it.
         final Map<String, String> updatedProperties = new HashMap<>();
@@ -343,7 +346,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         updatedProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "#{other-param}");
         initialProcessor.setProperties(updatedProperties);
 
-        final VersionedExternalFlow updatedVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
+        final VersionedExternalFlow updatedVersionSnapshot = createFlowSnapshot();
 
         // Create child group and update to the first version of the flow, with parameter ref
         final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -362,19 +365,16 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
     @Test
     public void testChangeVersionFromExplicitValueToParameterSensitiveProperty() {
         // Create a processor with a sensitive property
-        final ProcessorNode processorWithParamRef = createProcessorNode(UsernamePasswordProcessor.class);
-        processorWithParamRef.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}"));
-
-        final ProcessorNode processorWithExplicitValue = createProcessorNode(UsernamePasswordProcessor.class);
-        processorWithExplicitValue.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "secret-value"));
-
+        final ProcessorNode processor = createProcessorNode(UsernamePasswordProcessor.class);
+        processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}"));
 
         // Create a VersionedExternalFlow that contains the processor
         final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null);
-        final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(),
-            Collections.singletonList(processorWithParamRef), Collections.singleton(parameter));
+        setParameter(parameter);
+        final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot();
 
-        final VersionedExternalFlow versionedFlowExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processorWithExplicitValue), null);
+        processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "secret-value"));
+        final VersionedExternalFlow versionedFlowExplicitValue = createFlowSnapshot();
 
         // Create child group and update to the first version of the flow, with parameter ref
         final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -386,13 +386,13 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         final ProcessorNode nodeInGroupWithRef = innerGroup.getProcessors().iterator().next();
         assertNotNull(nodeInGroupWithRef.getProperty(UsernamePasswordProcessor.PASSWORD));
 
-
         // Update the flow to new version that uses explicit value.
         innerGroup.updateFlow(versionedFlowWithParameterReference, (String) null, true, true, true);
 
         // Updated flow has sensitive property that no longer references parameter. Now is an explicit value, so it should be unset
         final ProcessorNode nodeInGroupWithNoValue = innerGroup.getProcessors().iterator().next();
-        assertEquals("#{secret-param}", nodeInGroupWithNoValue.getProperty(UsernamePasswordProcessor.PASSWORD).getRawValue());
+        String actual = nodeInGroupWithNoValue.getProperty(UsernamePasswordProcessor.PASSWORD).getRawValue();
+        assertEquals("#{secret-param}", actual);
     }
 
     @Test
@@ -431,18 +431,18 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         assertTrue(groupA.getProcessors().isEmpty());
         assertTrue(groupA.getConnections().isEmpty());
         assertEquals(1, groupA.getInputPorts().size());
-        assertEquals(port.getVersionedComponentId(), groupA.getInputPorts().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(port.getName(), groupA.getInputPorts().stream().findFirst().get().getName());
 
         //Change Process Group A version to Version 2
         groupA.updateFlow(version2, null, false, true, true);
 
         //Process Group A should have a Process Group, a Processor and a Connection and no Input Ports
         assertEquals(1, groupA.getProcessGroups().size());
-        assertEquals(groupB.getVersionedComponentId(), groupA.getProcessGroups().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(groupB.getName(), groupA.getProcessGroups().stream().findFirst().get().getName());
         assertEquals(1, groupA.getProcessors().size());
-        assertEquals(processor.getVersionedComponentId(), groupA.getProcessors().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(processor.getName(), groupA.getProcessors().stream().findFirst().get().getName());
         assertEquals(1, groupA.getConnections().size());
-        assertEquals(connection.getVersionedComponentId(), groupA.getConnections().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(connection.getName(), groupA.getConnections().stream().findFirst().get().getName());
         assertTrue(groupA.getInputPorts().isEmpty());
     }
 
@@ -487,9 +487,9 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         assertEquals(1, group.getConnections().size());
         assertEquals(connection.getVersionedComponentId(), group.getConnections().stream().findFirst().get().getVersionedComponentId());
         assertEquals(1, group.getOutputPorts().size());
-        assertEquals(port.getVersionedComponentId(), group.getOutputPorts().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(port.getName(), group.getOutputPorts().stream().findFirst().get().getName());
         assertTrue(group.getFunnels().isEmpty());
-        assertEquals(connection.getDestination().getVersionedComponentId(), port.getVersionedComponentId());
+        assertEquals(connection.getDestination().getName(), port.getName());
 
         //Change Process Group version to Version 2
         group.updateFlow(version2, null, false, true, true);
@@ -497,12 +497,12 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         //Process Group should have a Funnel, a Processor, a Connection and no Output Ports
         assertTrue(group.getOutputPorts().isEmpty());
         assertEquals(1, group.getProcessors().size());
-        assertEquals(processor.getVersionedComponentId(), group.getProcessors().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(processor.getName(), group.getProcessors().stream().findFirst().get().getName());
         assertEquals(1, group.getConnections().size());
-        assertEquals(connection.getVersionedComponentId(), group.getConnections().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(connection.getName(), group.getConnections().stream().findFirst().get().getName());
         assertEquals(1, group.getFunnels().size());
-        assertEquals(funnel.getVersionedComponentId(), group.getFunnels().stream().findFirst().get().getVersionedComponentId());
-        assertEquals(connection.getDestination().getVersionedComponentId(), funnel.getVersionedComponentId());
+        assertEquals(funnel.getName(), group.getFunnels().stream().findFirst().get().getName());
+        assertEquals(connection.getDestination().getName(), funnel.getName());
     }
 
     @Test
@@ -511,45 +511,45 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         final ProcessGroup groupA = createProcessGroup("group-a-id", "Group A", getRootGroup());
 
         //Create Process Group B under Process Group A
-        final ProcessGroup groupB = createProcessGroup("group-b-id", "Group B", groupA);
+        final ProcessGroup groupBunderA = createProcessGroup("group-b-id", "Group B", groupA);
 
         //Add Input port under Process Group B
-        final Port inputPort = getFlowController().getFlowManager().createLocalInputPort("input-port-id", "Input Port");
-        groupB.addInputPort(inputPort);
+        final Port inputPortBThenStayThenDelete = getFlowController().getFlowManager().createLocalInputPort("input-port-id", "Input Port");
+        groupBunderA.addInputPort(inputPortBThenStayThenDelete);
 
         //Add Processor 1 under Process Group A
-        final ProcessorNode processor1 = createProcessorNode(GenerateProcessor.class, groupA);
+        final ProcessorNode processorA1 = createProcessorNode(GenerateProcessor.class, groupA);
 
         //Add Processor 2 under Process Group A
-        final ProcessorNode processor2 = createProcessorNode(GenerateProcessor.class, groupA);
+        final ProcessorNode processorA2 = createProcessorNode(GenerateProcessor.class, groupA);
 
         //Add Output Port under Process Group A
-        final Port outputPort = getFlowController().getFlowManager().createLocalOutputPort("output-port-id", "Output Port");
-        groupA.addOutputPort(outputPort);
+        final Port outputPortAThenB = getFlowController().getFlowManager().createLocalOutputPort("output-port-id", "Output Port");
+        groupA.addOutputPort(outputPortAThenB);
 
         //Connect Processor 1 and Output Port as Connection 1
-        final Connection connection1 = connect(groupA, processor1, outputPort, processor1.getRelationships());
+        final Connection connectionProcessorA1ToOutputPortAThenProcessorA2 = connect(groupA, processorA1, outputPortAThenB, processorA1.getRelationships());
 
         //Connect Processor 1 and Input Port as Connection 2
-        final Connection connection2 = connect(groupA, processor1, inputPort, processor1.getRelationships());
+        final Connection connectionProcessorA1ToInputPortBThenStayThenDelete = connect(groupA, processorA1, inputPortBThenStayThenDelete, processorA1.getRelationships());
 
         //Create a snapshot
         final VersionedExternalFlow version1 = createFlowSnapshot(groupA);
 
         //Modify Connection 1 to point to Processor 2
-        connection1.setDestination(processor2);
+        connectionProcessorA1ToOutputPortAThenProcessorA2.setDestination(processorA2);
 
         //Move Output Port to Process Group B
-        moveOutputPort(outputPort, groupB);
+        moveOutputPort(outputPortAThenB, groupBunderA);
 
         //Create another snapshot
         final VersionedExternalFlow version2 = createFlowSnapshot(groupA);
 
         //Delete connection 2
-        groupA.removeConnection(connection2);
+        groupA.removeConnection(connectionProcessorA1ToInputPortBThenStayThenDelete);
 
         //Delete Input Port
-        groupB.removeInputPort(inputPort);
+        groupBunderA.removeInputPort(inputPortBThenStayThenDelete);
 
         //Create another snapshot
         final VersionedExternalFlow version3 = createFlowSnapshot(groupA);
@@ -560,27 +560,27 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         //Process Group A should have two Processors, 2 Connections, one Output Port and one Process Group with one Input Port
         assertEquals(2, groupA.getProcessors().size());
         assertEquals(2, groupA.getConnections().size());
-        assertEquals(connection1.getDestination().getVersionedComponentId(), outputPort.getVersionedComponentId());
+        assertEquals(connectionProcessorA1ToOutputPortAThenProcessorA2.getDestination().getName(), outputPortAThenB.getName());
         assertEquals(1, groupA.getOutputPorts().size());
         assertEquals(1, groupA.getProcessGroups().size());
-        assertEquals(1, groupB.getInputPorts().size());
+        assertEquals(1, groupBunderA.getInputPorts().size());
 
         //Change Process Group version to Version 2
         groupA.updateFlow(version2, null, false, true, true);
 
         //Connection1 destination changed to Processor2 and Output Port moved to Process Group B
         assertTrue(groupA.getOutputPorts().isEmpty());
-        assertEquals(connection1.getDestination().getVersionedComponentId(), processor2.getVersionedComponentId());
-        assertEquals(1, groupB.getOutputPorts().size());
-        assertEquals(outputPort.getVersionedComponentId(), groupB.getOutputPorts().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(connectionProcessorA1ToOutputPortAThenProcessorA2.getDestination().getName(), processorA2.getName());
+        assertEquals(1, groupBunderA.getOutputPorts().size());
+        assertEquals(outputPortAThenB.getName(), groupBunderA.getOutputPorts().stream().findFirst().get().getName());
 
         //Change Process Group version to Version 3
         groupA.updateFlow(version3, null, false, true, true);
 
         //Connection2 and Input Port should be deleted
         assertEquals(1, groupA.getConnections().size());
-        assertEquals(connection1.getVersionedComponentId(), groupA.getConnections().stream().findFirst().get().getVersionedComponentId());
-        assertTrue(groupB.getInputPorts().isEmpty());
+        assertEquals(connectionProcessorA1ToOutputPortAThenProcessorA2.getName(), groupA.getConnections().stream().findFirst().get().getName());
+        assertTrue(groupBunderA.getInputPorts().isEmpty());
     }
 
     @Test
@@ -638,7 +638,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         //Process Group A should have two Processors, 2 Connections, one Output Port and one Process Group with one Input Port
         assertEquals(2, groupA.getProcessors().size());
         assertEquals(2, groupA.getConnections().size());
-        assertEquals(connection1.getDestination().getVersionedComponentId(), outputPort.getVersionedComponentId());
+        assertEquals(connection1.getDestination().getName(), outputPort.getName());
         assertEquals(1, groupA.getOutputPorts().size());
         assertEquals(1, groupA.getProcessGroups().size());
         assertEquals(1, groupB.getInputPorts().size());
@@ -647,7 +647,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         groupA.updateFlow(version2, null, false, true, true);
 
         //Connection1 destination changed to Processor2 and Output Port deleted
-        assertEquals(connection1.getDestination().getVersionedComponentId(), processor2.getVersionedComponentId());
+        assertEquals(connection1.getDestination().getName(), processor2.getName());
         assertTrue(groupA.getOutputPorts().isEmpty());
         assertTrue(groupB.getOutputPorts().isEmpty());
 
@@ -656,10 +656,22 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
 
         //Connection2 should be deleted and Input Port moved to Process Group A
         assertEquals(1, groupA.getConnections().size());
-        assertEquals(connection1.getVersionedComponentId(), groupA.getConnections().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(connection1.getName(), groupA.getConnections().stream().findFirst().get().getName());
         assertTrue(groupB.getInputPorts().isEmpty());
         assertEquals(1, groupA.getInputPorts().size());
-        assertEquals(inputPort.getVersionedComponentId(), groupA.getInputPorts().stream().findFirst().get().getVersionedComponentId());
+        assertEquals(inputPort.getName(), groupA.getInputPorts().stream().findFirst().get().getName());
+    }
+
+    private void setParameter(Parameter parameter) {
+        ParameterContext rootParameterContext = getFlowController().getFlowManager().getParameterContextManager().getParameterContext("unimportant");
+        if (rootParameterContext == null) {
+            rootParameterContext = getFlowController().getFlowManager().createParameterContext("unimportant", "unimportant", Collections.emptyMap(), Collections.emptyList());
+            getRootGroup().setParameterContext(rootParameterContext);
+        }
+
+        Map<String, Parameter> parameterMap = new HashMap<>();
+        parameterMap.put(parameter.getDescriptor().getName(), parameter);
+        rootParameterContext.setParameters(parameterMap);
     }
 
     private ProcessGroup createProcessGroup(final String groupId, final String groupName, final ProcessGroup destination) {
@@ -707,137 +719,34 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
         return differences;
     }
 
-    private VersionedExternalFlow createFlowSnapshot(final ProcessGroup group, final List<ControllerServiceNode> controllerServices,
-                                                     final List<ProcessorNode> processors, final Set<Parameter> parameters) {
+    private VersionedExternalFlow createFlowSnapshot() {
+        return createFlowSnapshot(getRootGroup());
+    }
+
+    private VersionedExternalFlow createFlowSnapshot(final ProcessGroup group) {
         createBundle();
 
         final NiFiRegistryFlowMapper flowMapper = new NiFiRegistryFlowMapper(getExtensionManager());
 
-        final List<ProcessorNode> processorNodes;
-        final List<ControllerServiceNode> controllerServiceNodes;
-        final List<Port> inputPorts;
-        final List<Port> outputPorts;
-        final List<Funnel> funnels;
-        final List<Connection> connections;
-        final List<ProcessGroup> processGroups;
-        final Set<VersionedProcessGroup> versionedProcessGroups;
-
-        if (group == null) {
-            processorNodes = processors;
-            controllerServiceNodes = controllerServices;
-            inputPorts = Collections.EMPTY_LIST;
-            outputPorts = Collections.EMPTY_LIST;
-            funnels = Collections.EMPTY_LIST;
-            connections = Collections.EMPTY_LIST;
-            versionedProcessGroups = Collections.EMPTY_SET;
-        } else {
-            processorNodes = new ArrayList<>(group.getProcessors());
-            controllerServiceNodes = new ArrayList<>(group.getControllerServices(false));
-            inputPorts = new ArrayList<>(group.getInputPorts());
-            outputPorts = new ArrayList<>(group.getOutputPorts());
-            funnels = new ArrayList<>(group.getFunnels());
-            connections = new ArrayList<>(group.getConnections());
-            processGroups = new ArrayList<>(group.getProcessGroups());
-
-            final VersionedProcessGroup versionedGroup = flowMapper.mapProcessGroup(group, getFlowController().getControllerServiceProvider(),getFlowController().getFlowRegistryClient(),true);
-            processGroups.forEach(processGroup->
-                versionedGroup.getProcessGroups().stream().filter(versionedProcessGroup -> versionedProcessGroup.getName().equals(processGroup.getName()))
-                        .forEach(filteredProcessGroup -> processGroup.setVersionedComponentId(filteredProcessGroup.getIdentifier())));
-            versionedProcessGroups = new HashSet<>(versionedGroup.getProcessGroups());
-        }
-
-        final Set<VersionedProcessor> versionedProcessors = new HashSet<>();
-        for (final ProcessorNode processor : processorNodes) {
-            final VersionedProcessor versionedProcessor = flowMapper.mapProcessor(processor, getFlowController().getControllerServiceProvider(), Collections.emptySet(), new HashMap<>());
-            versionedProcessors.add(versionedProcessor);
-            processor.setVersionedComponentId(versionedProcessor.getIdentifier());
-        }
-
-        final Set<VersionedControllerService> versionedServices = new HashSet<>();
-        for (final ControllerServiceNode serviceNode : controllerServiceNodes) {
-            final VersionedControllerService versionedService = flowMapper.mapControllerService(serviceNode, getFlowController().getControllerServiceProvider(),
-                    Collections.emptySet(), new HashMap<>());
-            versionedServices.add(versionedService);
-            serviceNode.setVersionedComponentId(versionedService.getIdentifier());
-        }
-
-        final Set<VersionedPort> versionedInputPorts = new HashSet<>();
-        for (final Port inputPort : inputPorts) {
-            final VersionedPort versionedInputPort = flowMapper.mapPort(inputPort);
-            versionedInputPorts.add(versionedInputPort);
-            inputPort.setVersionedComponentId(versionedInputPort.getIdentifier());
-        }
-
-        final Set<VersionedPort> versionedOutputPorts = new HashSet<>();
-        for (final Port outputPort : outputPorts) {
-            final VersionedPort versionedOutputPort = flowMapper.mapPort(outputPort);
-            versionedOutputPorts.add(versionedOutputPort);
-            outputPort.setVersionedComponentId(versionedOutputPort.getIdentifier());
-        }
-
-        final Set<VersionedFunnel> versionedFunnels = new HashSet<>();
-        for (final Funnel funnel : funnels) {
-            final VersionedFunnel versionedFunnel = flowMapper.mapFunnel(funnel);
-            versionedFunnels.add(versionedFunnel);
-            funnel.setVersionedComponentId(versionedFunnel.getIdentifier());
-        }
+        InstantiatedVersionedProcessGroup instantiatedVersionedProcessGroup = flowMapper.mapNonVersionedProcessGroup(group, getFlowController().getControllerServiceProvider());
+        final VersionedExternalFlow flow = new VersionedExternalFlow();
+        flow.setFlowContents(instantiatedVersionedProcessGroup);
 
-        final Set<VersionedConnection> versionedConnections = new HashSet<>();
-        for (final Connection connection : connections) {
-            final VersionedConnection versionedConnection = flowMapper.mapConnection(connection);
-            versionedConnections.add(versionedConnection);
-            connection.setVersionedComponentId(versionedConnection.getIdentifier());
+        Map<String, VersionedParameterContext> parameterContexts = new HashMap<>();
+        if (getRootGroup().getParameterContext() != null) {
+            parameterContexts.put(getRootGroup().getParameterContext().getName(), flowMapper.mapParameterContext(getRootGroup().getParameterContext()));
         }
-
-        final VersionedProcessGroup flowContents = createFlowContents();
-        flowContents.setProcessors(versionedProcessors);
-        flowContents.setControllerServices(versionedServices);
-        flowContents.setProcessGroups(versionedProcessGroups);
-        flowContents.setInputPorts(versionedInputPorts);
-        flowContents.setOutputPorts(versionedOutputPorts);
-        flowContents.setFunnels(versionedFunnels);
-        flowContents.setConnections(versionedConnections);
-
-        final VersionedExternalFlow externalFlow = new VersionedExternalFlow();
-
-        final VersionedExternalFlowMetadata metadata = new VersionedExternalFlowMetadata();
-        externalFlow.setMetadata(metadata);
-        metadata.setBucketIdentifier("unit-test-bucket");
-        metadata.setFlowIdentifier("unit-test-flow");
-        metadata.setVersion(1);
-        metadata.setFlowName("unit-test-flow");
-
-        if (parameters != null) {
-            final Set<VersionedParameter> versionedParameters = new HashSet<>();
-            for (final Parameter parameter : parameters) {
-                final VersionedParameter versionedParameter = new VersionedParameter();
-                versionedParameter.setName(parameter.getDescriptor().getName());
-                versionedParameter.setValue(parameter.getValue());
-                versionedParameter.setSensitive(parameter.getDescriptor().isSensitive());
-
-                versionedParameters.add(versionedParameter);
+        Set<ProcessGroup> childProcessGroups = getRootGroup().getProcessGroups();
+        for (ProcessGroup processGroup : childProcessGroups) {
+            if (processGroup.getParameterContext() != null) {
+                parameterContexts.put(processGroup.getParameterContext().getName(), flowMapper.mapParameterContext(processGroup.getParameterContext()));
             }
-
-            final VersionedParameterContext versionedParameterContext = new VersionedParameterContext();
-            versionedParameterContext.setName("Unit Test Context");
-            versionedParameterContext.setParameters(versionedParameters);
-            externalFlow.setParameterContexts(Collections.singletonMap(versionedParameterContext.getName(), versionedParameterContext));
-
-            flowContents.setParameterContextName("Unit Test Context");
         }
+        flow.setParameterContexts(parameterContexts);
 
-        return externalFlow;
-    }
-
-    private VersionedExternalFlow createFlowSnapshot(final List<ControllerServiceNode> controllerServices, final List<ProcessorNode> processors, final Set<Parameter> parameters) {
-        return createFlowSnapshot(null, controllerServices, processors, parameters);
-    }
-
-    private VersionedExternalFlow createFlowSnapshot(final ProcessGroup group) {
-        return createFlowSnapshot(group, Collections.emptyList(), Collections.emptyList(), null);
+        return flow;
     }
 
-
     @NotNull
     private VersionedProcessGroup createFlowContents() {
         final VersionedProcessGroup flowContents = new VersionedProcessGroup();