You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by kd...@apache.org on 2019/02/12 16:13:13 UTC
[nifi] branch master updated: NIFI-5950: Use temp port names during
flow updates
This is an automated email from the ASF dual-hosted git repository.
kdoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new a940ff6 NIFI-5950: Use temp port names during flow updates
a940ff6 is described below
commit a940ff634399ded5d97eba4620116bc88ae88d03
Author: Kevin Doran <kd...@apache.org>
AuthorDate: Mon Feb 11 17:08:09 2019 -0500
NIFI-5950: Use temp port names during flow updates
Use temporary values for port names when updated process groups as part of a Change Flow Version operation.
This avoids the potential for a name conflict between a ports during the update process.
Add a final step to the update process group logic to set the final name on all ports.
This closes #3301.
---
.../apache/nifi/groups/StandardProcessGroup.java | 85 ++++++++++++++++++----
1 file changed, 72 insertions(+), 13 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 61a902a..be49d09 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -1877,7 +1877,6 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
-
private Port getPortByName(final String name, final ProcessGroup group, final PortRetriever retriever) {
for (final Port port : retriever.getPorts(group)) {
if (port.getName().equals(name)) {
@@ -3442,6 +3441,15 @@ public final class StandardProcessGroup implements ProcessGroup {
final Set<String> updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName, final boolean updateDescendantVersionedGroups,
final Set<String> variablesToSkip) throws ProcessorInstantiationException {
+ // During the flow update, we will use temporary names for process group ports. This is because port names must be
+ // unique within a process group, but during an update we might temporarily be in a state where two ports have the same name.
+ // For example, if a process group update involves removing/renaming port A, and then adding/updating port B where B is given
+ // A's former name. This is a valid state by the end of the flow update, but for a brief moment there may be two ports with the
+ // same name. To avoid this conflict, we keep the final names in a map indexed by port id, use a temporary name for each port
+ // during the update, and after all ports have been added/updated/removed, we set the final names on all ports.
+ final Map<String, String> proposedInputPortNamesByPortId = new HashMap<>();
+ final Map<String, String> proposedOutputPortNamesByPortId = new HashMap<>();
+
group.setComments(proposed.getComments());
if (updateName) {
@@ -3590,11 +3598,15 @@ public final class StandardProcessGroup implements ProcessGroup {
for (final VersionedPort proposedPort : proposed.getInputPorts()) {
final Port port = inputPortsByVersionedId.get(proposedPort.getIdentifier());
if (port == null) {
- final Port added = addInputPort(group, proposedPort, componentIdSeed);
+ final String temporaryName = generateTemporaryPortName(proposedPort);
+ final Port added = addInputPort(group, proposedPort, componentIdSeed, temporaryName);
+ proposedInputPortNamesByPortId.put(added.getIdentifier(), proposedPort.getName());
flowManager.onInputPortAdded(added);
LOG.info("Added {} to {}", added, this);
} else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
- updatePort(port, proposedPort);
+ final String temporaryName = generateTemporaryPortName(proposedPort);
+ proposedInputPortNamesByPortId.put(port.getIdentifier(), proposedPort.getName());
+ updatePort(port, proposedPort, temporaryName);
LOG.info("Updated {}", port);
} else {
port.setPosition(new Position(proposedPort.getPosition().getX(), proposedPort.getPosition().getY()));
@@ -3611,11 +3623,15 @@ public final class StandardProcessGroup implements ProcessGroup {
for (final VersionedPort proposedPort : proposed.getOutputPorts()) {
final Port port = outputPortsByVersionedId.get(proposedPort.getIdentifier());
if (port == null) {
- final Port added = addOutputPort(group, proposedPort, componentIdSeed);
+ final String temporaryName = generateTemporaryPortName(proposedPort);
+ final Port added = addOutputPort(group, proposedPort, componentIdSeed, temporaryName);
+ proposedOutputPortNamesByPortId.put(added.getIdentifier(), proposedPort.getName());
flowManager.onOutputPortAdded(added);
LOG.info("Added {} to {}", added, this);
} else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
- updatePort(port, proposedPort);
+ final String temporaryName = generateTemporaryPortName(proposedPort);
+ proposedOutputPortNamesByPortId.put(port.getIdentifier(), proposedPort.getName());
+ updatePort(port, proposedPort, temporaryName);
LOG.info("Updated {}", port);
} else {
port.setPosition(new Position(proposedPort.getPosition().getX(), proposedPort.getPosition().getY()));
@@ -3771,6 +3787,31 @@ public final class StandardProcessGroup implements ProcessGroup {
group.removeOutputPort(port);
}
+ // Now that all input/output ports have been removed, we should be able to update
+ // all ports to the final name that was proposed in the new flow version.
+ for (final Map.Entry<String, String> idAndName : proposedInputPortNamesByPortId.entrySet()) {
+ final String portId = idAndName.getKey();
+ final String portFinalName = idAndName.getValue();
+ final Port port = getInputPort(portId);
+ if (port == null) {
+ LOG.warn("Expected to find input port with id={} but it was missing.", portId);
+ continue;
+ }
+ LOG.info("Updating {} to replace temporary name with final name", port);
+ updatePortToSetFinalName(port, portFinalName);
+ }
+ for (final Map.Entry<String, String> idAndName : proposedOutputPortNamesByPortId.entrySet()) {
+ final String portId = idAndName.getKey();
+ final String portFinalName = idAndName.getValue();
+ final Port port = getOutputPort(portId);
+ if (port == null) {
+ LOG.warn("Expected to find output port with id={} but it was missing.", portId);
+ continue;
+ }
+ LOG.info("Updating {} to replace temporary name with final name", port);
+ updatePortToSetFinalName(port, portFinalName);
+ }
+
for (final String removedVersionedId : labelsRemoved) {
final Label label = labelsByVersionedId.get(removedVersionedId);
LOG.info("Removing {} from {}", label, group);
@@ -3810,6 +3851,21 @@ public final class StandardProcessGroup implements ProcessGroup {
return true;
}
+ private String generateTemporaryPortName(final VersionedPort proposedPort) {
+ final String versionedPortId = proposedPort.getIdentifier();
+ final String proposedPortFinalName = proposedPort.getName();
+ return proposedPortFinalName + " (" + versionedPortId + ")";
+ }
+
+ private void updatePortToSetFinalName(final Port port, final String name) {
+ writeLock.lock();
+ try {
+ port.setName(name);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
private String generateUuid(final String propposedId, final String destinationGroupId, final String seed) {
long msb = UUID.nameUUIDFromBytes((propposedId + destinationGroupId).getBytes(StandardCharsets.UTF_8)).getMostSignificantBits();
@@ -4126,26 +4182,29 @@ public final class StandardProcessGroup implements ProcessGroup {
return funnel;
}
- private void updatePort(final Port port, final VersionedPort proposed) {
+ private void updatePort(final Port port, final VersionedPort proposed, final String temporaryName) {
+ final String name = temporaryName != null ? temporaryName : proposed.getName();
port.setComments(proposed.getComments());
- port.setName(proposed.getName());
+ port.setName(name);
port.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
}
- private Port addInputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed) {
- final Port port = flowManager.createLocalInputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName());
+ private Port addInputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed, final String temporaryName) {
+ final String name = temporaryName != null ? temporaryName : proposed.getName();
+ final Port port = flowManager.createLocalInputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), name);
port.setVersionedComponentId(proposed.getIdentifier());
destination.addInputPort(port);
- updatePort(port, proposed);
+ updatePort(port, proposed, temporaryName);
return port;
}
- private Port addOutputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed) {
- final Port port = flowManager.createLocalOutputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName());
+ private Port addOutputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed, final String temporaryName) {
+ final String name = temporaryName != null ? temporaryName : proposed.getName();
+ final Port port = flowManager.createLocalOutputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), name);
port.setVersionedComponentId(proposed.getIdentifier());
destination.addOutputPort(port);
- updatePort(port, proposed);
+ updatePort(port, proposed, temporaryName);
return port;
}