You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/04/08 14:08:42 UTC
[nifi] branch main updated: NIFI-9875 In StandardProcessGroupSynchronizer.updateConnectionDestinations handle special case when output port is moved to a child process group while kept connected. (#5931)
This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 1cf4e72084 NIFI-9875 In StandardProcessGroupSynchronizer.updateConnectionDestinations handle special case when output port is moved to a child process group while kept connected. (#5931)
1cf4e72084 is described below
commit 1cf4e72084f30707ffb0676f77355a810c2d6e9e
Author: tpalfy <53...@users.noreply.github.com>
AuthorDate: Fri Apr 8 16:08:31 2022 +0200
NIFI-9875 In StandardProcessGroupSynchronizer.updateConnectionDestinations handle special case when output port is moved to a child process group while kept connected. (#5931)
---
.../groups/StandardProcessGroupSynchronizer.java | 10 +-
.../nifi/integration/versioned/ImportFlowIT.java | 281 +++++++--------------
2 files changed, 103 insertions(+), 188 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
index cefaf22ada..8c5592dc18 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
@@ -250,7 +250,9 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
// Ensure that we create all Parameter Contexts before updating them. This is necessary in case the proposed incoming dataflow has
// parameter contexts that inherit from one another and neither the inheriting nor inherited parameter context exists.
- versionedParameterContexts.values().forEach(this::createParameterContextWithoutReferences);
+ if (versionedParameterContexts != null) {
+ versionedParameterContexts.values().forEach(this::createParameterContextWithoutReferences);
+ }
updateParameterContext(group, proposed, versionedParameterContexts, context.getComponentIdGenerator());
updateVariableRegistry(group, proposed);
@@ -559,7 +561,11 @@ public class StandardProcessGroupSynchronizer implements ProcessGroupSynchronize
// Find the destination of the connection. If the destination doesn't yet exist (because it's part of the proposed Process Group but not yet added),
// we will set the destination to a temporary destination. Then, after adding components, we will update the destinations again.
Connectable newDestination = getConnectable(group, proposedConnection.getDestination());
- if (newDestination == null) {
+ if (
+ newDestination == null
+ ||
+ (newDestination.getConnectableType() == ConnectableType.OUTPUT_PORT && !newDestination.getProcessGroup().equals(connection.getProcessGroup()))
+ ) {
final Funnel temporaryDestination = getTemporaryFunnel(connection.getProcessGroup());
LOG.debug("Updated Connection {} to have a temporary destination of {}", connection, temporaryDestination);
newDestination = temporaryDestination;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
index 92f711e87a..2c82165e67 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/versioned/ImportFlowIT.java
@@ -27,16 +27,9 @@ import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.StandardSnippet;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flow.Bundle;
-import org.apache.nifi.flow.VersionedConnection;
-import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedExternalFlow;
-import org.apache.nifi.flow.VersionedExternalFlowMetadata;
-import org.apache.nifi.flow.VersionedFunnel;
-import org.apache.nifi.flow.VersionedParameter;
import org.apache.nifi.flow.VersionedParameterContext;
-import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessGroup;
-import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.integration.DirectInjectionExtensionManager;
import org.apache.nifi.integration.FrameworkIntegrationTest;
@@ -61,12 +54,13 @@ import org.apache.nifi.registry.flow.diff.FlowComparison;
import org.apache.nifi.registry.flow.diff.FlowDifference;
import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.util.FlowDifferenceFilters;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;
-import java.util.ArrayList;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -85,6 +79,14 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.AssertJUnit.assertNull;
public class ImportFlowIT extends FrameworkIntegrationTest {
+ @Override
+ public void setup() throws IOException {
+ super.setup();
+
+ for (ProcessorNode processor : getRootGroup().getProcessors()) {
+ getRootGroup().removeProcessor(processor);
+ }
+ }
@Override
protected void injectExtensionTypes(final DirectInjectionExtensionManager extensionManager) {
@@ -103,7 +105,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
processor.setAutoTerminatedRelationships(Collections.singleton(REL_SUCCESS));
processor.setProperties(Collections.singletonMap(NopServiceReferencingProcessor.SERVICE.getName(), controllerService.getIdentifier()));
- final VersionedExternalFlow proposedFlow = createFlowSnapshot(Collections.singletonList(controllerService), Collections.singletonList(processor), null);
+ final VersionedExternalFlow proposedFlow = createFlowSnapshot();
// Create an Inner Process Group and update it to match the Versioned Flow.
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -147,7 +149,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "password"));
// Create a VersionedExternalFlow that contains the processor
- final VersionedExternalFlow versionedFlowWithExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), null);
+ final VersionedExternalFlow versionedFlowWithExplicitValue = createFlowSnapshot();
// Create child group
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -173,7 +175,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
assertEquals(DifferenceType.PROPERTY_PARAMETERIZED, differences.iterator().next().getDifferenceType());
// Create a Versioned Flow that contains the Parameter Reference.
- final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), null);
+ final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(innerGroup);
// Ensure no difference between the current configuration and the versioned flow
differences = getLocalModifications(innerGroup, versionedFlowWithParameterReference);
@@ -193,7 +195,8 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
// Create a VersionedExternalFlow that contains the processor
final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null);
- final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), Collections.singleton(parameter));
+ setParameter(parameter);
+ final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot();
// Create child group
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -225,7 +228,8 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
// Create a VersionedExternalFlow that contains the processor
final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null);
- final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processor), Collections.singleton(parameter));
+ setParameter(parameter);
+ final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot();
// Create child group
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -255,13 +259,12 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
// Create a VersionedExternalFlow that contains the processor
final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null);
- final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(),
- Collections.singletonList(initialProcessor), Collections.singleton(parameter));
-
+ setParameter(parameter);
+ final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot();
// Update processor to have an explicit value for the second version of the flow.
initialProcessor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "secret-value"));
- final VersionedExternalFlow versionedFlowExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
+ final VersionedExternalFlow versionedFlowExplicitValue = createFlowSnapshot();
// Create child group and update to the first version of the flow, with parameter ref
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -290,7 +293,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
initialProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "pass");
initialProcessor.setProperties(initialProperties);
- final VersionedExternalFlow initialVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
+ final VersionedExternalFlow initialVersionSnapshot = createFlowSnapshot();
// Update processor to have a different explicit value for both sensitive and non-sensitive properties and create a versioned flow for it.
final Map<String, String> updatedProperties = new HashMap<>();
@@ -298,7 +301,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
updatedProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "pass");
initialProcessor.setProperties(updatedProperties);
- final VersionedExternalFlow updatedVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
+ final VersionedExternalFlow updatedVersionSnapshot = createFlowSnapshot();
// Create child group and update to the first version of the flow, with parameter ref
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -335,7 +338,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
initialProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}");
initialProcessor.setProperties(initialProperties);
- final VersionedExternalFlow initialVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
+ final VersionedExternalFlow initialVersionSnapshot = createFlowSnapshot();
// Update processor to have a different explicit value for both sensitive and non-sensitive properties and create a versioned flow for it.
final Map<String, String> updatedProperties = new HashMap<>();
@@ -343,7 +346,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
updatedProperties.put(UsernamePasswordProcessor.PASSWORD.getName(), "#{other-param}");
initialProcessor.setProperties(updatedProperties);
- final VersionedExternalFlow updatedVersionSnapshot = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(initialProcessor), null);
+ final VersionedExternalFlow updatedVersionSnapshot = createFlowSnapshot();
// Create child group and update to the first version of the flow, with parameter ref
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -362,19 +365,16 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
@Test
public void testChangeVersionFromExplicitValueToParameterSensitiveProperty() {
// Create a processor with a sensitive property
- final ProcessorNode processorWithParamRef = createProcessorNode(UsernamePasswordProcessor.class);
- processorWithParamRef.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}"));
-
- final ProcessorNode processorWithExplicitValue = createProcessorNode(UsernamePasswordProcessor.class);
- processorWithExplicitValue.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "secret-value"));
-
+ final ProcessorNode processor = createProcessorNode(UsernamePasswordProcessor.class);
+ processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "#{secret-param}"));
// Create a VersionedExternalFlow that contains the processor
final Parameter parameter = new Parameter(new ParameterDescriptor.Builder().name("secret-param").sensitive(true).build(), null);
- final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot(Collections.emptyList(),
- Collections.singletonList(processorWithParamRef), Collections.singleton(parameter));
+ setParameter(parameter);
+ final VersionedExternalFlow versionedFlowWithParameterReference = createFlowSnapshot();
- final VersionedExternalFlow versionedFlowExplicitValue = createFlowSnapshot(Collections.emptyList(), Collections.singletonList(processorWithExplicitValue), null);
+ processor.setProperties(Collections.singletonMap(UsernamePasswordProcessor.PASSWORD.getName(), "secret-value"));
+ final VersionedExternalFlow versionedFlowExplicitValue = createFlowSnapshot();
// Create child group and update to the first version of the flow, with parameter ref
final ProcessGroup innerGroup = getFlowController().getFlowManager().createProcessGroup("inner-group-id");
@@ -386,13 +386,13 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final ProcessorNode nodeInGroupWithRef = innerGroup.getProcessors().iterator().next();
assertNotNull(nodeInGroupWithRef.getProperty(UsernamePasswordProcessor.PASSWORD));
-
// Update the flow to new version that uses explicit value.
innerGroup.updateFlow(versionedFlowWithParameterReference, (String) null, true, true, true);
// Updated flow has sensitive property that no longer references parameter. Now is an explicit value, so it should be unset
final ProcessorNode nodeInGroupWithNoValue = innerGroup.getProcessors().iterator().next();
- assertEquals("#{secret-param}", nodeInGroupWithNoValue.getProperty(UsernamePasswordProcessor.PASSWORD).getRawValue());
+ String actual = nodeInGroupWithNoValue.getProperty(UsernamePasswordProcessor.PASSWORD).getRawValue();
+ assertEquals("#{secret-param}", actual);
}
@Test
@@ -431,18 +431,18 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
assertTrue(groupA.getProcessors().isEmpty());
assertTrue(groupA.getConnections().isEmpty());
assertEquals(1, groupA.getInputPorts().size());
- assertEquals(port.getVersionedComponentId(), groupA.getInputPorts().stream().findFirst().get().getVersionedComponentId());
+ assertEquals(port.getName(), groupA.getInputPorts().stream().findFirst().get().getName());
//Change Process Group A version to Version 2
groupA.updateFlow(version2, null, false, true, true);
//Process Group A should have a Process Group, a Processor and a Connection and no Input Ports
assertEquals(1, groupA.getProcessGroups().size());
- assertEquals(groupB.getVersionedComponentId(), groupA.getProcessGroups().stream().findFirst().get().getVersionedComponentId());
+ assertEquals(groupB.getName(), groupA.getProcessGroups().stream().findFirst().get().getName());
assertEquals(1, groupA.getProcessors().size());
- assertEquals(processor.getVersionedComponentId(), groupA.getProcessors().stream().findFirst().get().getVersionedComponentId());
+ assertEquals(processor.getName(), groupA.getProcessors().stream().findFirst().get().getName());
assertEquals(1, groupA.getConnections().size());
- assertEquals(connection.getVersionedComponentId(), groupA.getConnections().stream().findFirst().get().getVersionedComponentId());
+ assertEquals(connection.getName(), groupA.getConnections().stream().findFirst().get().getName());
assertTrue(groupA.getInputPorts().isEmpty());
}
@@ -487,9 +487,9 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
assertEquals(1, group.getConnections().size());
assertEquals(connection.getVersionedComponentId(), group.getConnections().stream().findFirst().get().getVersionedComponentId());
assertEquals(1, group.getOutputPorts().size());
- assertEquals(port.getVersionedComponentId(), group.getOutputPorts().stream().findFirst().get().getVersionedComponentId());
+ assertEquals(port.getName(), group.getOutputPorts().stream().findFirst().get().getName());
assertTrue(group.getFunnels().isEmpty());
- assertEquals(connection.getDestination().getVersionedComponentId(), port.getVersionedComponentId());
+ assertEquals(connection.getDestination().getName(), port.getName());
//Change Process Group version to Version 2
group.updateFlow(version2, null, false, true, true);
@@ -497,12 +497,12 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
//Process Group should have a Funnel, a Processor, a Connection and no Output Ports
assertTrue(group.getOutputPorts().isEmpty());
assertEquals(1, group.getProcessors().size());
- assertEquals(processor.getVersionedComponentId(), group.getProcessors().stream().findFirst().get().getVersionedComponentId());
+ assertEquals(processor.getName(), group.getProcessors().stream().findFirst().get().getName());
assertEquals(1, group.getConnections().size());
- assertEquals(connection.getVersionedComponentId(), group.getConnections().stream().findFirst().get().getVersionedComponentId());
+ assertEquals(connection.getName(), group.getConnections().stream().findFirst().get().getName());
assertEquals(1, group.getFunnels().size());
- assertEquals(funnel.getVersionedComponentId(), group.getFunnels().stream().findFirst().get().getVersionedComponentId());
- assertEquals(connection.getDestination().getVersionedComponentId(), funnel.getVersionedComponentId());
+ assertEquals(funnel.getName(), group.getFunnels().stream().findFirst().get().getName());
+ assertEquals(connection.getDestination().getName(), funnel.getName());
}
@Test
@@ -511,45 +511,45 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
final ProcessGroup groupA = createProcessGroup("group-a-id", "Group A", getRootGroup());
//Create Process Group B under Process Group A
- final ProcessGroup groupB = createProcessGroup("group-b-id", "Group B", groupA);
+ final ProcessGroup groupBunderA = createProcessGroup("group-b-id", "Group B", groupA);
//Add Input port under Process Group B
- final Port inputPort = getFlowController().getFlowManager().createLocalInputPort("input-port-id", "Input Port");
- groupB.addInputPort(inputPort);
+ final Port inputPortBThenStayThenDelete = getFlowController().getFlowManager().createLocalInputPort("input-port-id", "Input Port");
+ groupBunderA.addInputPort(inputPortBThenStayThenDelete);
//Add Processor 1 under Process Group A
- final ProcessorNode processor1 = createProcessorNode(GenerateProcessor.class, groupA);
+ final ProcessorNode processorA1 = createProcessorNode(GenerateProcessor.class, groupA);
//Add Processor 2 under Process Group A
- final ProcessorNode processor2 = createProcessorNode(GenerateProcessor.class, groupA);
+ final ProcessorNode processorA2 = createProcessorNode(GenerateProcessor.class, groupA);
//Add Output Port under Process Group A
- final Port outputPort = getFlowController().getFlowManager().createLocalOutputPort("output-port-id", "Output Port");
- groupA.addOutputPort(outputPort);
+ final Port outputPortAThenB = getFlowController().getFlowManager().createLocalOutputPort("output-port-id", "Output Port");
+ groupA.addOutputPort(outputPortAThenB);
//Connect Processor 1 and Output Port as Connection 1
- final Connection connection1 = connect(groupA, processor1, outputPort, processor1.getRelationships());
+ final Connection connectionProcessorA1ToOutputPortAThenProcessorA2 = connect(groupA, processorA1, outputPortAThenB, processorA1.getRelationships());
//Connect Processor 1 and Input Port as Connection 2
- final Connection connection2 = connect(groupA, processor1, inputPort, processor1.getRelationships());
+ final Connection connectionProcessorA1ToInputPortBThenStayThenDelete = connect(groupA, processorA1, inputPortBThenStayThenDelete, processorA1.getRelationships());
//Create a snapshot
final VersionedExternalFlow version1 = createFlowSnapshot(groupA);
//Modify Connection 1 to point to Processor 2
- connection1.setDestination(processor2);
+ connectionProcessorA1ToOutputPortAThenProcessorA2.setDestination(processorA2);
//Move Output Port to Process Group B
- moveOutputPort(outputPort, groupB);
+ moveOutputPort(outputPortAThenB, groupBunderA);
//Create another snapshot
final VersionedExternalFlow version2 = createFlowSnapshot(groupA);
//Delete connection 2
- groupA.removeConnection(connection2);
+ groupA.removeConnection(connectionProcessorA1ToInputPortBThenStayThenDelete);
//Delete Input Port
- groupB.removeInputPort(inputPort);
+ groupBunderA.removeInputPort(inputPortBThenStayThenDelete);
//Create another snapshot
final VersionedExternalFlow version3 = createFlowSnapshot(groupA);
@@ -560,27 +560,27 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
//Process Group A should have two Processors, 2 Connections, one Output Port and one Process Group with one Input Port
assertEquals(2, groupA.getProcessors().size());
assertEquals(2, groupA.getConnections().size());
- assertEquals(connection1.getDestination().getVersionedComponentId(), outputPort.getVersionedComponentId());
+ assertEquals(connectionProcessorA1ToOutputPortAThenProcessorA2.getDestination().getName(), outputPortAThenB.getName());
assertEquals(1, groupA.getOutputPorts().size());
assertEquals(1, groupA.getProcessGroups().size());
- assertEquals(1, groupB.getInputPorts().size());
+ assertEquals(1, groupBunderA.getInputPorts().size());
//Change Process Group version to Version 2
groupA.updateFlow(version2, null, false, true, true);
//Connection1 destination changed to Processor2 and Output Port moved to Process Group B
assertTrue(groupA.getOutputPorts().isEmpty());
- assertEquals(connection1.getDestination().getVersionedComponentId(), processor2.getVersionedComponentId());
- assertEquals(1, groupB.getOutputPorts().size());
- assertEquals(outputPort.getVersionedComponentId(), groupB.getOutputPorts().stream().findFirst().get().getVersionedComponentId());
+ assertEquals(connectionProcessorA1ToOutputPortAThenProcessorA2.getDestination().getName(), processorA2.getName());
+ assertEquals(1, groupBunderA.getOutputPorts().size());
+ assertEquals(outputPortAThenB.getName(), groupBunderA.getOutputPorts().stream().findFirst().get().getName());
//Change Process Group version to Version 3
groupA.updateFlow(version3, null, false, true, true);
//Connection2 and Input Port should be deleted
assertEquals(1, groupA.getConnections().size());
- assertEquals(connection1.getVersionedComponentId(), groupA.getConnections().stream().findFirst().get().getVersionedComponentId());
- assertTrue(groupB.getInputPorts().isEmpty());
+ assertEquals(connectionProcessorA1ToOutputPortAThenProcessorA2.getName(), groupA.getConnections().stream().findFirst().get().getName());
+ assertTrue(groupBunderA.getInputPorts().isEmpty());
}
@Test
@@ -638,7 +638,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
//Process Group A should have two Processors, 2 Connections, one Output Port and one Process Group with one Input Port
assertEquals(2, groupA.getProcessors().size());
assertEquals(2, groupA.getConnections().size());
- assertEquals(connection1.getDestination().getVersionedComponentId(), outputPort.getVersionedComponentId());
+ assertEquals(connection1.getDestination().getName(), outputPort.getName());
assertEquals(1, groupA.getOutputPorts().size());
assertEquals(1, groupA.getProcessGroups().size());
assertEquals(1, groupB.getInputPorts().size());
@@ -647,7 +647,7 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
groupA.updateFlow(version2, null, false, true, true);
//Connection1 destination changed to Processor2 and Output Port deleted
- assertEquals(connection1.getDestination().getVersionedComponentId(), processor2.getVersionedComponentId());
+ assertEquals(connection1.getDestination().getName(), processor2.getName());
assertTrue(groupA.getOutputPorts().isEmpty());
assertTrue(groupB.getOutputPorts().isEmpty());
@@ -656,10 +656,22 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
//Connection2 should be deleted and Input Port moved to Process Group A
assertEquals(1, groupA.getConnections().size());
- assertEquals(connection1.getVersionedComponentId(), groupA.getConnections().stream().findFirst().get().getVersionedComponentId());
+ assertEquals(connection1.getName(), groupA.getConnections().stream().findFirst().get().getName());
assertTrue(groupB.getInputPorts().isEmpty());
assertEquals(1, groupA.getInputPorts().size());
- assertEquals(inputPort.getVersionedComponentId(), groupA.getInputPorts().stream().findFirst().get().getVersionedComponentId());
+ assertEquals(inputPort.getName(), groupA.getInputPorts().stream().findFirst().get().getName());
+ }
+
+ private void setParameter(Parameter parameter) {
+ ParameterContext rootParameterContext = getFlowController().getFlowManager().getParameterContextManager().getParameterContext("unimportant");
+ if (rootParameterContext == null) {
+ rootParameterContext = getFlowController().getFlowManager().createParameterContext("unimportant", "unimportant", Collections.emptyMap(), Collections.emptyList());
+ getRootGroup().setParameterContext(rootParameterContext);
+ }
+
+ Map<String, Parameter> parameterMap = new HashMap<>();
+ parameterMap.put(parameter.getDescriptor().getName(), parameter);
+ rootParameterContext.setParameters(parameterMap);
}
private ProcessGroup createProcessGroup(final String groupId, final String groupName, final ProcessGroup destination) {
@@ -707,137 +719,34 @@ public class ImportFlowIT extends FrameworkIntegrationTest {
return differences;
}
- private VersionedExternalFlow createFlowSnapshot(final ProcessGroup group, final List<ControllerServiceNode> controllerServices,
- final List<ProcessorNode> processors, final Set<Parameter> parameters) {
+ private VersionedExternalFlow createFlowSnapshot() {
+ return createFlowSnapshot(getRootGroup());
+ }
+
+ private VersionedExternalFlow createFlowSnapshot(final ProcessGroup group) {
createBundle();
final NiFiRegistryFlowMapper flowMapper = new NiFiRegistryFlowMapper(getExtensionManager());
- final List<ProcessorNode> processorNodes;
- final List<ControllerServiceNode> controllerServiceNodes;
- final List<Port> inputPorts;
- final List<Port> outputPorts;
- final List<Funnel> funnels;
- final List<Connection> connections;
- final List<ProcessGroup> processGroups;
- final Set<VersionedProcessGroup> versionedProcessGroups;
-
- if (group == null) {
- processorNodes = processors;
- controllerServiceNodes = controllerServices;
- inputPorts = Collections.EMPTY_LIST;
- outputPorts = Collections.EMPTY_LIST;
- funnels = Collections.EMPTY_LIST;
- connections = Collections.EMPTY_LIST;
- versionedProcessGroups = Collections.EMPTY_SET;
- } else {
- processorNodes = new ArrayList<>(group.getProcessors());
- controllerServiceNodes = new ArrayList<>(group.getControllerServices(false));
- inputPorts = new ArrayList<>(group.getInputPorts());
- outputPorts = new ArrayList<>(group.getOutputPorts());
- funnels = new ArrayList<>(group.getFunnels());
- connections = new ArrayList<>(group.getConnections());
- processGroups = new ArrayList<>(group.getProcessGroups());
-
- final VersionedProcessGroup versionedGroup = flowMapper.mapProcessGroup(group, getFlowController().getControllerServiceProvider(),getFlowController().getFlowRegistryClient(),true);
- processGroups.forEach(processGroup->
- versionedGroup.getProcessGroups().stream().filter(versionedProcessGroup -> versionedProcessGroup.getName().equals(processGroup.getName()))
- .forEach(filteredProcessGroup -> processGroup.setVersionedComponentId(filteredProcessGroup.getIdentifier())));
- versionedProcessGroups = new HashSet<>(versionedGroup.getProcessGroups());
- }
-
- final Set<VersionedProcessor> versionedProcessors = new HashSet<>();
- for (final ProcessorNode processor : processorNodes) {
- final VersionedProcessor versionedProcessor = flowMapper.mapProcessor(processor, getFlowController().getControllerServiceProvider(), Collections.emptySet(), new HashMap<>());
- versionedProcessors.add(versionedProcessor);
- processor.setVersionedComponentId(versionedProcessor.getIdentifier());
- }
-
- final Set<VersionedControllerService> versionedServices = new HashSet<>();
- for (final ControllerServiceNode serviceNode : controllerServiceNodes) {
- final VersionedControllerService versionedService = flowMapper.mapControllerService(serviceNode, getFlowController().getControllerServiceProvider(),
- Collections.emptySet(), new HashMap<>());
- versionedServices.add(versionedService);
- serviceNode.setVersionedComponentId(versionedService.getIdentifier());
- }
-
- final Set<VersionedPort> versionedInputPorts = new HashSet<>();
- for (final Port inputPort : inputPorts) {
- final VersionedPort versionedInputPort = flowMapper.mapPort(inputPort);
- versionedInputPorts.add(versionedInputPort);
- inputPort.setVersionedComponentId(versionedInputPort.getIdentifier());
- }
-
- final Set<VersionedPort> versionedOutputPorts = new HashSet<>();
- for (final Port outputPort : outputPorts) {
- final VersionedPort versionedOutputPort = flowMapper.mapPort(outputPort);
- versionedOutputPorts.add(versionedOutputPort);
- outputPort.setVersionedComponentId(versionedOutputPort.getIdentifier());
- }
-
- final Set<VersionedFunnel> versionedFunnels = new HashSet<>();
- for (final Funnel funnel : funnels) {
- final VersionedFunnel versionedFunnel = flowMapper.mapFunnel(funnel);
- versionedFunnels.add(versionedFunnel);
- funnel.setVersionedComponentId(versionedFunnel.getIdentifier());
- }
+ InstantiatedVersionedProcessGroup instantiatedVersionedProcessGroup = flowMapper.mapNonVersionedProcessGroup(group, getFlowController().getControllerServiceProvider());
+ final VersionedExternalFlow flow = new VersionedExternalFlow();
+ flow.setFlowContents(instantiatedVersionedProcessGroup);
- final Set<VersionedConnection> versionedConnections = new HashSet<>();
- for (final Connection connection : connections) {
- final VersionedConnection versionedConnection = flowMapper.mapConnection(connection);
- versionedConnections.add(versionedConnection);
- connection.setVersionedComponentId(versionedConnection.getIdentifier());
+ Map<String, VersionedParameterContext> parameterContexts = new HashMap<>();
+ if (getRootGroup().getParameterContext() != null) {
+ parameterContexts.put(getRootGroup().getParameterContext().getName(), flowMapper.mapParameterContext(getRootGroup().getParameterContext()));
}
-
- final VersionedProcessGroup flowContents = createFlowContents();
- flowContents.setProcessors(versionedProcessors);
- flowContents.setControllerServices(versionedServices);
- flowContents.setProcessGroups(versionedProcessGroups);
- flowContents.setInputPorts(versionedInputPorts);
- flowContents.setOutputPorts(versionedOutputPorts);
- flowContents.setFunnels(versionedFunnels);
- flowContents.setConnections(versionedConnections);
-
- final VersionedExternalFlow externalFlow = new VersionedExternalFlow();
-
- final VersionedExternalFlowMetadata metadata = new VersionedExternalFlowMetadata();
- externalFlow.setMetadata(metadata);
- metadata.setBucketIdentifier("unit-test-bucket");
- metadata.setFlowIdentifier("unit-test-flow");
- metadata.setVersion(1);
- metadata.setFlowName("unit-test-flow");
-
- if (parameters != null) {
- final Set<VersionedParameter> versionedParameters = new HashSet<>();
- for (final Parameter parameter : parameters) {
- final VersionedParameter versionedParameter = new VersionedParameter();
- versionedParameter.setName(parameter.getDescriptor().getName());
- versionedParameter.setValue(parameter.getValue());
- versionedParameter.setSensitive(parameter.getDescriptor().isSensitive());
-
- versionedParameters.add(versionedParameter);
+ Set<ProcessGroup> childProcessGroups = getRootGroup().getProcessGroups();
+ for (ProcessGroup processGroup : childProcessGroups) {
+ if (processGroup.getParameterContext() != null) {
+ parameterContexts.put(processGroup.getParameterContext().getName(), flowMapper.mapParameterContext(processGroup.getParameterContext()));
}
-
- final VersionedParameterContext versionedParameterContext = new VersionedParameterContext();
- versionedParameterContext.setName("Unit Test Context");
- versionedParameterContext.setParameters(versionedParameters);
- externalFlow.setParameterContexts(Collections.singletonMap(versionedParameterContext.getName(), versionedParameterContext));
-
- flowContents.setParameterContextName("Unit Test Context");
}
+ flow.setParameterContexts(parameterContexts);
- return externalFlow;
- }
-
- private VersionedExternalFlow createFlowSnapshot(final List<ControllerServiceNode> controllerServices, final List<ProcessorNode> processors, final Set<Parameter> parameters) {
- return createFlowSnapshot(null, controllerServices, processors, parameters);
- }
-
- private VersionedExternalFlow createFlowSnapshot(final ProcessGroup group) {
- return createFlowSnapshot(group, Collections.emptyList(), Collections.emptyList(), null);
+ return flow;
}
-
@NotNull
private VersionedProcessGroup createFlowContents() {
final VersionedProcessGroup flowContents = new VersionedProcessGroup();