You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/08/22 13:07:45 UTC

[nifi] branch main updated: NIFI-10371: When a component is moved between groups, ensure that its versioned component id is unique within the destination group. Also ensure that when adding a connection to a PG with the VersionedComponentSynchronizer that we prefer obtaining source/destination by instance id instead of versioned id.

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b012e9aad2 NIFI-10371: When a component is moved between groups, ensure that its versioned component id is unique within the destination group. Also ensure that when adding a connection to a PG with the VersionedComponentSynchronizer that we prefer obtaining source/destination by instance id instead of versioned id.
b012e9aad2 is described below

commit b012e9aad298ad54012b5cda319aef90cbdfa8ab
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Aug 17 17:57:35 2022 -0400

    NIFI-10371: When a component is moved between groups, ensure that its versioned component id is unique within the destination group. Also ensure that when adding a connection to a PG with the VersionedComponentSynchronizer that we prefer obtaining source/destination by instance id instead of versioned id.
    
    Fixed bug where ProcessGroup would inadvertently set the wrong component's Versioned Component ID to null when there was an ID conflict
    
    This closes #6314
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../StandardVersionedComponentSynchronizer.java    | 19 +++++++-
 .../apache/nifi/groups/StandardProcessGroup.java   | 55 ++++++++++++++++++++++
 .../org/apache/nifi/controller/AbstractPort.java   | 10 ++++
 .../nifi/integration/FrameworkIntegrationTest.java |  4 +-
 .../processgroup/StandardProcessGroupIT.java       | 45 ++++++++++++++++++
 .../resources/int-tests/clustered-nifi.properties  |  2 +-
 .../resources/int-tests/default-nifi.properties    |  2 +-
 7 files changed, 133 insertions(+), 4 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index fa1a07931f..e9c41c21bc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -3072,7 +3072,24 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
     }
 
     private Connectable getConnectable(final ProcessGroup group, final ConnectableComponent connectableComponent) {
-        final String id = connectableComponent.getId();
+        // Always prefer the instance identifier, if it's available.
+        final Connectable connectable = getConnectable(group, connectableComponent, ConnectableComponent::getInstanceIdentifier);
+        if (connectable != null) {
+            LOG.debug("Found Connectable {} in Process Group {} by Instance ID {}", connectable, group, connectableComponent.getInstanceIdentifier());
+            return connectable;
+        }
+
+        // If we're synchronizing and the component is not available by the instance ID, lookup the component by the ID instead.
+        final Connectable connectableById = getConnectable(group, connectableComponent, ConnectableComponent::getId);
+        LOG.debug("Found no connectable in Process Group {} by Instance ID. Lookup by ID {} yielded {}", connectable, connectableComponent.getId(), connectableById);
+        return connectableById;
+    }
+
+    private Connectable getConnectable(final ProcessGroup group, final ConnectableComponent connectableComponent, final Function<ConnectableComponent, String> idFunction) {
+        final String id = idFunction.apply(connectableComponent);
+        if (id == null) {
+            return null;
+        }
 
         switch (connectableComponent.getType()) {
             case FUNNEL:
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 3e8ee1ea38..06a83e331a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -612,6 +612,7 @@ public final class StandardProcessGroup implements ProcessGroup {
         try {
             // Unique port check within the same group.
             verifyPortUniqueness(port, inputPorts, this::getInputPortByName);
+            ensureUniqueVersionControlId(port, getInputPorts());
 
             port.setProcessGroup(this);
             inputPorts.put(requireNonNull(port).getIdentifier(), port);
@@ -695,6 +696,7 @@ public final class StandardProcessGroup implements ProcessGroup {
         try {
             // Unique port check within the same group.
             verifyPortUniqueness(port, outputPorts, this::getOutputPortByName);
+            ensureUniqueVersionControlId(port, getOutputPorts());
 
             port.setProcessGroup(this);
             outputPorts.put(port.getIdentifier(), port);
@@ -770,6 +772,8 @@ public final class StandardProcessGroup implements ProcessGroup {
 
         writeLock.lock();
         try {
+            ensureUniqueVersionControlId(group, getProcessGroups());
+
             group.setParent(this);
             group.getVariableRegistry().setParent(getVariableRegistry());
 
@@ -877,6 +881,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 throw new IllegalStateException("RemoteProcessGroup already exists with ID " + remoteGroup.getIdentifier());
             }
 
+            ensureUniqueVersionControlId(remoteGroup, getRemoteProcessGroups());
             remoteGroup.setProcessGroup(this);
             remoteGroups.put(Objects.requireNonNull(remoteGroup).getIdentifier(), remoteGroup);
             onComponentModified();
@@ -958,6 +963,8 @@ public final class StandardProcessGroup implements ProcessGroup {
                 throw new IllegalStateException("A processor is already registered to this ProcessGroup with ID " + processorId);
             }
 
+            ensureUniqueVersionControlId(processor, getProcessors());
+
             processor.setProcessGroup(this);
             processor.getVariableRegistry().setParent(getVariableRegistry());
             processors.put(processorId, processor);
@@ -971,6 +978,50 @@ public final class StandardProcessGroup implements ProcessGroup {
         }
     }
 
+    /**
+     * A component's Versioned Component ID is used to link a component on the canvas to a component in a versioned flow.
+     * There may, however, be multiple instances of the same versioned flow in a single NiFi instance. In this case, we will have
+     * multiple components with the same Versioned Component ID. This is acceptable as long as no two components within the same Process Group
+     * have the same Versioned Component ID. However, it is not acceptable to have two components within the same Process Group that have the same
+     * Versioned Component ID. If this happens, we will have no way to know which component in our flow maps to which component in the versioned flow.
+     * We don't have an issue with this when a flow is imported, etc. because it is always imported to a new Process Group. However, because it's possible
+     * to move most components between groups, we can have a situation in which a component is moved to a higher group, and that can result in a conflict.
+     * In such a case, we handle this by nulling out the Versioned Component ID if there is a conflict. This essentially makes NiFi behave as if a component
+     * is copied & pasted instead of being moved whenever a conflict occurs.
+     *
+     * @param component the component whose Versioned Component ID should be nulled if there's a conflict
+     * @param componentsToCheck the components to check to determine if there's a conflict
+     */
+    private void ensureUniqueVersionControlId(final org.apache.nifi.components.VersionedComponent component,
+                                              final Collection<? extends org.apache.nifi.components.VersionedComponent> componentsToCheck) {
+        final Optional<String> optionalVersionControlId = component.getVersionedComponentId();
+        if (!optionalVersionControlId.isPresent()) {
+            return;
+        }
+
+        final String versionControlId = optionalVersionControlId.get();
+        final boolean duplicateId = containsVersionedComponentId(componentsToCheck, versionControlId);
+
+        if (duplicateId) {
+            LOG.debug("Adding {} to {}, found conflicting Version Component ID {} so marking Version Component ID of {} as null", component, this, versionControlId, component);
+            component.setVersionedComponentId(null);
+        } else {
+            LOG.debug("Adding {} to {}, found no conflicting Version Component ID for ID {}", component, this, versionControlId);
+        }
+    }
+
+    private boolean containsVersionedComponentId(final Collection<? extends org.apache.nifi.components.VersionedComponent> components, final String id) {
+        for (final org.apache.nifi.components.VersionedComponent component : components) {
+            final Optional<String> optionalConnectableId = component.getVersionedComponentId();
+            if (optionalConnectableId.isPresent() && Objects.equals(optionalConnectableId.get(), id)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+
     /**
      * Looks for any property that is configured on the given component that references a Controller Service.
      * If any exists, and that Controller Service is not accessible from this Process Group, then the given
@@ -1185,6 +1236,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 }
             }
 
+            ensureUniqueVersionControlId(connection, getConnections());
             connection.setProcessGroup(this);
             source.addConnection(connection);
             if (source != destination) {  // don't call addConnection twice if it's a self-looping connection.
@@ -1401,6 +1453,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 throw new IllegalStateException("A label already exists in this ProcessGroup with ID " + label.getIdentifier());
             }
 
+            ensureUniqueVersionControlId(label, getLabels());
             label.setProcessGroup(this);
             labels.put(label.getIdentifier(), label);
             onComponentModified();
@@ -2151,6 +2204,8 @@ public final class StandardProcessGroup implements ProcessGroup {
                 throw new IllegalStateException("A funnel already exists in this ProcessGroup with ID " + funnel.getIdentifier());
             }
 
+            ensureUniqueVersionControlId(funnel, getFunnels());
+
             funnel.setProcessGroup(this);
             funnels.put(funnel.getIdentifier(), funnel);
             flowManager.onFunnelAdded(funnel);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
index d42bc729f1..43968c9d66 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
@@ -42,6 +42,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -167,6 +168,15 @@ public abstract class AbstractPort implements Port {
 
     @Override
     public void setProcessGroup(final ProcessGroup newGroup) {
+        if (this.processGroup.get() != null && !Objects.equals(newGroup, this.processGroup.get())) {
+            // Process Group is changing. For a Port, we effectively want to consider this the same as
+            // deleting an old port and creating a new one, in terms of tracking the port to a versioned flow.
+            // This ensures that we have a unique versioned component id not only in the given process group but also
+            // between the given group and its parent and all children. This is important for ports because we can
+            // connect to/from them between Process Groups, so we need to ensure unique IDs.
+            versionedComponentId.set(null);
+        }
+
         this.processGroup.set(newGroup);
     }
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
index 965b3ac3e9..553c6356a7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
@@ -409,7 +409,9 @@ public class FrameworkIntegrationTest {
         final String uuid = getSimpleTypeName(processorType) + "-" + UUID.randomUUID().toString();
         final BundleCoordinate bundleCoordinate = SystemBundle.SYSTEM_BUNDLE_COORDINATE;
         final ProcessorNode procNode = flowController.getFlowManager().createProcessor(processorType, uuid, bundleCoordinate, Collections.emptySet(), true, true, null);
-        destination.addProcessor(procNode);
+        if (destination != null) {
+            destination.addProcessor(procNode);
+        }
 
         return procNode;
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
index a2bc54fc44..53cda12c98 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/processgroup/StandardProcessGroupIT.java
@@ -21,17 +21,20 @@ import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.ComponentNode;
 import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.StandardSnippet;
 import org.apache.nifi.controller.queue.DropFlowFileState;
 import org.apache.nifi.controller.queue.DropFlowFileStatus;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.integration.FrameworkIntegrationTest;
+import org.apache.nifi.integration.processor.BiConsumerProcessor;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.Revision;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -42,8 +45,50 @@ import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 
 public class StandardProcessGroupIT extends FrameworkIntegrationTest {
+
+    @Test
+    public void testConflictingVersionedComponentId() {
+        final ProcessorNode proc1 = createProcessorNode(BiConsumerProcessor.class, null);
+        getRootGroup().addProcessor(proc1);
+
+        final ProcessorNode proc2 = createProcessorNode(BiConsumerProcessor.class, null);
+        proc2.setVersionedComponentId("aaa");
+        getRootGroup().addProcessor(proc2);
+        // Ensure that id didn't change
+        assertEquals("aaa", proc2.getVersionedComponentId().get());
+
+        final ProcessorNode proc3 = createProcessorNode(BiConsumerProcessor.class, null);
+        proc3.setVersionedComponentId("bbb");
+        getRootGroup().addProcessor(proc3);
+        assertEquals("bbb", proc3.getVersionedComponentId().get());
+
+        final ProcessorNode proc4 = createProcessorNode(BiConsumerProcessor.class, null);
+        proc4.setVersionedComponentId("bbb");
+        getRootGroup().addProcessor(proc4);
+        // Ensure that versioned component id was nulled out
+        assertFalse(proc4.getVersionedComponentId().isPresent());
+
+        final ProcessGroup childGroup = getFlowController().getFlowManager().createProcessGroup("child");
+        childGroup.setName("child");
+        getRootGroup().addProcessGroup(childGroup);
+
+        final ProcessorNode proc5 = createProcessorNode(BiConsumerProcessor.class, null);
+        proc5.setVersionedComponentId("bbb");
+        childGroup.addProcessor(proc5);
+        assertEquals("bbb", proc5.getVersionedComponentId().get());
+
+        // Move processor from child group to parent group.
+        // This should null out the ID for proc5 and leave proc3 as is.
+        final StandardSnippet snippet = new StandardSnippet();
+        snippet.addProcessors(Collections.singletonMap(proc5.getIdentifier(), new Revision(0L, "abc", proc5.getIdentifier())));
+        childGroup.move(snippet, getRootGroup());
+        assertFalse(proc5.getVersionedComponentId().isPresent());
+        assertEquals("bbb", proc3.getVersionedComponentId().get());
+    }
+
     @Test
     public void testProcessGroupDefaults() {
         // Connect two processors with default settings of the root process group
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/clustered-nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/clustered-nifi.properties
index cfbb442c95..95ddce1fc0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/clustered-nifi.properties
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/clustered-nifi.properties
@@ -85,7 +85,7 @@ nifi.content.viewer.url=../nifi-content-viewer/
 
 # Provenance Repository Properties
 nifi.provenance.repository.implementation=org.apache.nifi.provenance.WriteAheadProvenanceRepository
-nifi.provenance.repository.debug.frequency=1_000_000
+nifi.provenance.repository.debug.frequency=1000000
 
 # Persistent Provenance Repository Properties
 nifi.provenance.repository.directory.default=./target/int-tests/provenance_repository
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/default-nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/default-nifi.properties
index 2959474e53..6735a6f4f3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/default-nifi.properties
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/int-tests/default-nifi.properties
@@ -85,7 +85,7 @@ nifi.content.viewer.url=../nifi-content-viewer/
 
 # Provenance Repository Properties
 nifi.provenance.repository.implementation=org.apache.nifi.provenance.WriteAheadProvenanceRepository
-nifi.provenance.repository.debug.frequency=1_000_000
+nifi.provenance.repository.debug.frequency=1000000
 
 # Persistent Provenance Repository Properties
 nifi.provenance.repository.directory.default=./target/int-tests/provenance_repository