You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by kd...@apache.org on 2019/02/12 16:13:13 UTC

[nifi] branch master updated: NIFI-5950: Use temp port names during flow updates

This is an automated email from the ASF dual-hosted git repository.

kdoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new a940ff6  NIFI-5950: Use temp port names during flow updates
a940ff6 is described below

commit a940ff634399ded5d97eba4620116bc88ae88d03
Author: Kevin Doran <kd...@apache.org>
AuthorDate: Mon Feb 11 17:08:09 2019 -0500

    NIFI-5950: Use temp port names during flow updates
    
    Use temporary values for port names when updated process groups as part of a Change Flow Version operation.
    This avoids the potential for a name conflict between a ports during the update process.
    
    Add a final step to the update process group logic to set the final name on all ports.
    
    This closes #3301.
---
 .../apache/nifi/groups/StandardProcessGroup.java   | 85 ++++++++++++++++++----
 1 file changed, 72 insertions(+), 13 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 61a902a..be49d09 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -1877,7 +1877,6 @@ public final class StandardProcessGroup implements ProcessGroup {
         }
     }
 
-
     private Port getPortByName(final String name, final ProcessGroup group, final PortRetriever retriever) {
         for (final Port port : retriever.getPorts(group)) {
             if (port.getName().equals(name)) {
@@ -3442,6 +3441,15 @@ public final class StandardProcessGroup implements ProcessGroup {
         final Set<String> updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName, final boolean updateDescendantVersionedGroups,
         final Set<String> variablesToSkip) throws ProcessorInstantiationException {
 
+        // During the flow update, we will use temporary names for process group ports. This is because port names must be
+        // unique within a process group, but during an update we might temporarily be in a state where two ports have the same name.
+        // For example, if a process group update involves removing/renaming port A, and then adding/updating port B where B is given
+        // A's former name. This is a valid state by the end of the flow update, but for a brief moment there may be two ports with the
+        // same name. To avoid this conflict, we keep the final names in a map indexed by port id, use a temporary name for each port
+        // during the update, and after all ports have been added/updated/removed, we set the final names on all ports.
+        final Map<String, String> proposedInputPortNamesByPortId = new HashMap<>();
+        final Map<String, String> proposedOutputPortNamesByPortId = new HashMap<>();
+
         group.setComments(proposed.getComments());
 
         if (updateName) {
@@ -3590,11 +3598,15 @@ public final class StandardProcessGroup implements ProcessGroup {
         for (final VersionedPort proposedPort : proposed.getInputPorts()) {
             final Port port = inputPortsByVersionedId.get(proposedPort.getIdentifier());
             if (port == null) {
-                final Port added = addInputPort(group, proposedPort, componentIdSeed);
+                final String temporaryName = generateTemporaryPortName(proposedPort);
+                final Port added = addInputPort(group, proposedPort, componentIdSeed, temporaryName);
+                proposedInputPortNamesByPortId.put(added.getIdentifier(), proposedPort.getName());
                 flowManager.onInputPortAdded(added);
                 LOG.info("Added {} to {}", added, this);
             } else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
-                updatePort(port, proposedPort);
+                final String temporaryName = generateTemporaryPortName(proposedPort);
+                proposedInputPortNamesByPortId.put(port.getIdentifier(), proposedPort.getName());
+                updatePort(port, proposedPort, temporaryName);
                 LOG.info("Updated {}", port);
             } else {
                 port.setPosition(new Position(proposedPort.getPosition().getX(), proposedPort.getPosition().getY()));
@@ -3611,11 +3623,15 @@ public final class StandardProcessGroup implements ProcessGroup {
         for (final VersionedPort proposedPort : proposed.getOutputPorts()) {
             final Port port = outputPortsByVersionedId.get(proposedPort.getIdentifier());
             if (port == null) {
-                final Port added = addOutputPort(group, proposedPort, componentIdSeed);
+                final String temporaryName = generateTemporaryPortName(proposedPort);
+                final Port added = addOutputPort(group, proposedPort, componentIdSeed, temporaryName);
+                proposedOutputPortNamesByPortId.put(added.getIdentifier(), proposedPort.getName());
                 flowManager.onOutputPortAdded(added);
                 LOG.info("Added {} to {}", added, this);
             } else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
-                updatePort(port, proposedPort);
+                final String temporaryName = generateTemporaryPortName(proposedPort);
+                proposedOutputPortNamesByPortId.put(port.getIdentifier(), proposedPort.getName());
+                updatePort(port, proposedPort, temporaryName);
                 LOG.info("Updated {}", port);
             } else {
                 port.setPosition(new Position(proposedPort.getPosition().getX(), proposedPort.getPosition().getY()));
@@ -3771,6 +3787,31 @@ public final class StandardProcessGroup implements ProcessGroup {
             group.removeOutputPort(port);
         }
 
+        // Now that all input/output ports have been removed, we should be able to update
+        // all ports to the final name that was proposed in the new flow version.
+        for (final Map.Entry<String, String> idAndName : proposedInputPortNamesByPortId.entrySet()) {
+            final String portId = idAndName.getKey();
+            final String portFinalName = idAndName.getValue();
+            final Port port = getInputPort(portId);
+            if (port == null) {
+                LOG.warn("Expected to find input port with id={} but it was missing.", portId);
+                continue;
+            }
+            LOG.info("Updating {} to replace temporary name with final name", port);
+            updatePortToSetFinalName(port, portFinalName);
+        }
+        for (final Map.Entry<String, String> idAndName : proposedOutputPortNamesByPortId.entrySet()) {
+            final String portId = idAndName.getKey();
+            final String portFinalName = idAndName.getValue();
+            final Port port = getOutputPort(portId);
+            if (port == null) {
+                LOG.warn("Expected to find output port with id={} but it was missing.", portId);
+                continue;
+            }
+            LOG.info("Updating {} to replace temporary name with final name", port);
+            updatePortToSetFinalName(port, portFinalName);
+        }
+
         for (final String removedVersionedId : labelsRemoved) {
             final Label label = labelsByVersionedId.get(removedVersionedId);
             LOG.info("Removing {} from {}", label, group);
@@ -3810,6 +3851,21 @@ public final class StandardProcessGroup implements ProcessGroup {
         return true;
     }
 
+    private String generateTemporaryPortName(final VersionedPort proposedPort) {
+        final String versionedPortId = proposedPort.getIdentifier();
+        final String proposedPortFinalName = proposedPort.getName();
+        return proposedPortFinalName + " (" + versionedPortId + ")";
+    }
+
+    private void updatePortToSetFinalName(final Port port, final String name) {
+        writeLock.lock();
+        try {
+            port.setName(name);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
     private String generateUuid(final String propposedId, final String destinationGroupId, final String seed) {
         long msb = UUID.nameUUIDFromBytes((propposedId + destinationGroupId).getBytes(StandardCharsets.UTF_8)).getMostSignificantBits();
 
@@ -4126,26 +4182,29 @@ public final class StandardProcessGroup implements ProcessGroup {
         return funnel;
     }
 
-    private void updatePort(final Port port, final VersionedPort proposed) {
+    private void updatePort(final Port port, final VersionedPort proposed, final String temporaryName) {
+        final String name = temporaryName != null ? temporaryName : proposed.getName();
         port.setComments(proposed.getComments());
-        port.setName(proposed.getName());
+        port.setName(name);
         port.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
     }
 
-    private Port addInputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed) {
-        final Port port = flowManager.createLocalInputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName());
+    private Port addInputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed, final String temporaryName) {
+        final String name = temporaryName != null ? temporaryName : proposed.getName();
+        final Port port = flowManager.createLocalInputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), name);
         port.setVersionedComponentId(proposed.getIdentifier());
         destination.addInputPort(port);
-        updatePort(port, proposed);
+        updatePort(port, proposed, temporaryName);
 
         return port;
     }
 
-    private Port addOutputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed) {
-        final Port port = flowManager.createLocalOutputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName());
+    private Port addOutputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed, final String temporaryName) {
+        final String name = temporaryName != null ? temporaryName : proposed.getName();
+        final Port port = flowManager.createLocalOutputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), name);
         port.setVersionedComponentId(proposed.getIdentifier());
         destination.addOutputPort(port);
-        updatePort(port, proposed);
+        updatePort(port, proposed, temporaryName);
 
         return port;
     }