You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2019/08/28 14:39:19 UTC

[nifi] branch master updated: NIFI-6028: Updates to ignore changes between local version of a flow and a remote version of a flow if the difference is the addition of a new property that has the default value

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 59734da  NIFI-6028: Updates to ignore changes between local version of a flow and a remote version of a flow if the difference is the addition of a new property that has the default value
59734da is described below

commit 59734dad88303591c96b175ab81d6415c6b5e04c
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Mon Jun 17 13:26:29 2019 -0400

    NIFI-6028: Updates to ignore changes between local version of a flow and a remote version of a flow if the difference is the addition of a new property that has the default value
    
    NIFI-6028: Code refactoring to address review feedback
    
    This closes #3544.
    
    Signed-off-by: Bryan Bende <bb...@apache.org>
---
 .../apache/nifi/groups/StandardProcessGroup.java   |   4 +-
 .../apache/nifi/util/FlowDifferenceFilters.java    | 149 +++++++++++++++++++--
 .../apache/nifi/web/StandardNiFiServiceFacade.java |  14 +-
 .../org/apache/nifi/web/api/dto/DtoFactory.java    |  29 +++-
 .../nifi/web/controller/ControllerFacade.java      |   4 +
 pom.xml                                            |   2 +-
 6 files changed, 185 insertions(+), 17 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 52c5cbe..b3e2a7c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -3465,7 +3465,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             if (latestVersion == vci.getVersion()) {
                 versionControlFields.setStale(false);
                 if (latestVersion == 0) {
-                    LOG.debug("{} does not have any version in the Registry", this, latestVersion);
+                    LOG.debug("{} does not have any version in the Registry", this);
                     versionControlFields.setLocallyModified(true);
                 } else {
                     LOG.debug("{} is currently at the most recent version ({}) of the flow that is under Version Control", this, latestVersion);
@@ -4725,6 +4725,8 @@ public final class StandardProcessGroup implements ProcessGroup {
                 .filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
                 .filter(FlowDifferenceFilters.FILTER_PUBLIC_PORT_NAME_CHANGES)
                 .filter(FlowDifferenceFilters.FILTER_IGNORABLE_VERSIONED_FLOW_COORDINATE_CHANGES)
+                .filter(diff -> !FlowDifferenceFilters.isNewPropertyWithDefaultValue(diff, flowManager))
+                .filter(diff -> !FlowDifferenceFilters.isNewRelationshipAutoTerminatedAndDefaulted(diff, versionedGroup, flowManager))
                 .collect(Collectors.toCollection(HashSet::new));
 
         LOG.debug("There are {} differences between this Local Flow and the Versioned Flow: {}", differences.size(), differences);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
index c4c7403..d594f5f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
@@ -16,15 +16,29 @@
  */
 package org.apache.nifi.util;
 
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ComponentNode;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.registry.flow.ComponentType;
 import org.apache.nifi.registry.flow.VersionedComponent;
+import org.apache.nifi.registry.flow.VersionedConnection;
 import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
 import org.apache.nifi.registry.flow.VersionedPort;
 import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.flow.VersionedProcessor;
 import org.apache.nifi.registry.flow.diff.DifferenceType;
 import org.apache.nifi.registry.flow.diff.FlowDifference;
 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
 
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
 import java.util.function.Predicate;
 
 public class FlowDifferenceFilters {
@@ -32,9 +46,7 @@ public class FlowDifferenceFilters {
     /**
      * Predicate that returns true if the difference is NOT a name change on a public port (i.e. VersionedPort that allows remote access).
      */
-    public static Predicate<FlowDifference> FILTER_PUBLIC_PORT_NAME_CHANGES = (fd) -> {
-        return !isPublicPortNameChange(fd);
-    };
+    public static Predicate<FlowDifference> FILTER_PUBLIC_PORT_NAME_CHANGES = (fd) -> !isPublicPortNameChange(fd);
 
     public static boolean isPublicPortNameChange(final FlowDifference fd) {
         final VersionedComponent versionedComponent = fd.getComponentA();
@@ -51,9 +63,7 @@ public class FlowDifferenceFilters {
     /**
      * Predicate that returns true if the difference is NOT a remote port being added, and false if it is.
      */
-    public static Predicate<FlowDifference> FILTER_ADDED_REMOVED_REMOTE_PORTS =  (fd) -> {
-        return !isAddedOrRemovedRemotePort(fd);
-    };
+    public static Predicate<FlowDifference> FILTER_ADDED_REMOVED_REMOTE_PORTS =  (fd) -> !isAddedOrRemovedRemotePort(fd);
 
     public static boolean isAddedOrRemovedRemotePort(final FlowDifference fd) {
         if (fd.getDifferenceType() == DifferenceType.COMPONENT_ADDED || fd.getDifferenceType() == DifferenceType.COMPONENT_REMOVED) {
@@ -71,19 +81,14 @@ public class FlowDifferenceFilters {
         return false;
     }
 
-    public static Predicate<FlowDifference> FILTER_IGNORABLE_VERSIONED_FLOW_COORDINATE_CHANGES = (fd) -> {
-        return !isIgnorableVersionedFlowCoordinateChange(fd);
-    };
+    public static Predicate<FlowDifference> FILTER_IGNORABLE_VERSIONED_FLOW_COORDINATE_CHANGES = (fd) -> !isIgnorableVersionedFlowCoordinateChange(fd);
 
     public static boolean isIgnorableVersionedFlowCoordinateChange(final FlowDifference fd) {
         if (fd.getDifferenceType() == DifferenceType.VERSIONED_FLOW_COORDINATES_CHANGED) {
             final VersionedComponent componentA = fd.getComponentA();
             final VersionedComponent componentB = fd.getComponentB();
 
-            if (componentA != null && componentB != null
-                    && componentA instanceof VersionedProcessGroup
-                    && componentB instanceof VersionedProcessGroup) {
-
+            if (componentA instanceof VersionedProcessGroup && componentB instanceof VersionedProcessGroup) {
                 final VersionedProcessGroup versionedProcessGroupA = (VersionedProcessGroup) componentA;
                 final VersionedProcessGroup versionedProcessGroupB = (VersionedProcessGroup) componentB;
 
@@ -113,4 +118,122 @@ public class FlowDifferenceFilters {
 
         return false;
     }
+
+
+    public static boolean isNewPropertyWithDefaultValue(final FlowDifference fd, final FlowManager flowManager) {
+        if (fd.getDifferenceType() != DifferenceType.PROPERTY_ADDED) {
+            return false;
+        }
+
+        final VersionedComponent componentB = fd.getComponentB();
+
+        if (componentB instanceof InstantiatedVersionedProcessor) {
+            final InstantiatedVersionedProcessor instantiatedProcessor = (InstantiatedVersionedProcessor) componentB;
+            final ProcessorNode processorNode = flowManager.getProcessorNode(instantiatedProcessor.getInstanceId());
+            return isNewPropertyWithDefaultValue(fd, processorNode);
+        } else if (componentB instanceof InstantiatedVersionedControllerService) {
+            final InstantiatedVersionedControllerService instantiatedControllerService = (InstantiatedVersionedControllerService) componentB;
+            final ControllerServiceNode controllerService = flowManager.getControllerServiceNode(instantiatedControllerService.getInstanceId());
+            return isNewPropertyWithDefaultValue(fd, controllerService);
+        }
+
+        return false;
+    }
+
+    private static boolean isNewPropertyWithDefaultValue(final FlowDifference fd, final ComponentNode componentNode) {
+        if (componentNode == null) {
+            return false;
+        }
+
+        final Optional<String> optionalFieldName = fd.getFieldName();
+        if (!optionalFieldName.isPresent()) {
+            return false;
+        }
+
+        final String fieldName = optionalFieldName.get();
+        final PropertyDescriptor propertyDescriptor = componentNode.getPropertyDescriptor(fieldName);
+        if (propertyDescriptor == null) {
+            return false;
+        }
+
+        if (Objects.equals(fd.getValueB(), propertyDescriptor.getDefaultValue())) {
+            return true;
+        }
+
+        return false;
+    }
+
+
+    public static boolean isNewRelationshipAutoTerminatedAndDefaulted(final FlowDifference fd, final VersionedProcessGroup processGroup, final FlowManager flowManager) {
+        if (fd.getDifferenceType() != DifferenceType.AUTO_TERMINATED_RELATIONSHIPS_CHANGED) {
+            return false;
+        }
+
+        if (!(fd.getComponentA() instanceof VersionedProcessor) || !(fd.getComponentB() instanceof InstantiatedVersionedProcessor)) {
+            // Should not happen, since only processors have auto-terminated relationships.
+            return false;
+        }
+
+        final VersionedProcessor processorA = (VersionedProcessor) fd.getComponentA();
+        final VersionedProcessor processorB = (VersionedProcessor) fd.getComponentB();
+
+        // Determine if this Flow Difference indicates that Processor B has all of the same Auto-Terminated Relationships as Processor A, plus some.
+        // If that is the case, then it may be that a new Relationship was added, defaulting to 'Auto-Terminated' and that Processor B is still auto-terminated.
+        // We want to be able to identify that case.
+        final Set<String> autoTerminatedA = processorA.getAutoTerminatedRelationships();
+        final Set<String> autoTerminatedB = processorB.getAutoTerminatedRelationships();
+
+        // If B is smaller than A, then B cannot possibly contain all of A. So use that as a first comparison to avoid the expense of #containsAll
+        if (autoTerminatedB.size() < autoTerminatedA.size() || !autoTerminatedB.containsAll(autoTerminatedA)) {
+            // If B does not contain all of A, then the FlowDifference is indicative of some other change.
+            return false;
+        }
+
+        final InstantiatedVersionedProcessor instantiatedVersionedProcessor = (InstantiatedVersionedProcessor) processorB;
+        final ProcessorNode processorNode = flowManager.getProcessorNode(instantiatedVersionedProcessor.getInstanceId());
+        if (processorNode == null) {
+            return false;
+        }
+
+        final Set<String> newlyAddedAutoTerminated = new HashSet<>(autoTerminatedB);
+        newlyAddedAutoTerminated.removeAll(autoTerminatedA);
+
+        for (final String relationshipName : newlyAddedAutoTerminated) {
+            final Relationship relationship = processorNode.getRelationship(relationshipName);
+            if (relationship == null) {
+                return false;
+            }
+
+            final boolean defaultAutoTerminated = relationship.isAutoTerminated();
+            if (!defaultAutoTerminated) {
+                return false;
+            }
+
+            if (hasConnection(processGroup, processorA, relationshipName)) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+
+    /**
+     * Determines whether or not the given Process Group has a Connection whose source is the given Processor and that contains the given relationship
+     *
+     * @param processGroup the process group
+     * @param processor the source processor
+     * @param relationship the relationship
+     *
+     * @return <code>true</code> if such a connection exists, <code>false</code> otherwise.
+     */
+    private static boolean hasConnection(final VersionedProcessGroup processGroup, final VersionedProcessor processor, final String relationship) {
+        for (final VersionedConnection connection : processGroup.getConnections()) {
+            if (connection.getSource().getId().equals(processor.getIdentifier()) && connection.getSelectedRelationships().contains(relationship)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 5ba1da6..da5ada2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -77,6 +77,7 @@ import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.Snippet;
 import org.apache.nifi.controller.Template;
+import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.leader.election.LeaderElectionManager;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
@@ -4382,7 +4383,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, new ConciseEvolvingDifferenceDescriptor());
         final FlowComparison flowComparison = flowComparator.compare();
 
-        final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison);
+        final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison, controllerFacade.getFlowManager());
 
         final FlowComparisonEntity entity = new FlowComparisonEntity();
         entity.setComponentDifferences(differenceDtos);
@@ -4517,11 +4518,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, ancestorGroupServiceIds, new StaticDifferenceDescriptor());
         final FlowComparison comparison = flowComparator.compare();
 
+        final FlowManager flowManager = controllerFacade.getFlowManager();
         final Set<AffectedComponentEntity> affectedComponents = comparison.getDifferences().stream()
             .filter(difference -> difference.getDifferenceType() != DifferenceType.COMPONENT_ADDED) // components that are added are not components that will be affected in the local flow.
             .filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED)
             .filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
             .filter(FlowDifferenceFilters.FILTER_IGNORABLE_VERSIONED_FLOW_COORDINATE_CHANGES)
+            .filter(diff -> !FlowDifferenceFilters.isNewPropertyWithDefaultValue(diff, flowManager))
+            .filter(diff -> !FlowDifferenceFilters.isNewRelationshipAutoTerminatedAndDefaulted(diff, proposedFlow.getContents(), flowManager))
             .map(difference -> {
                 final VersionedComponent localComponent = difference.getComponentA();
 
@@ -4572,6 +4576,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
                 continue;
             }
 
+            if (FlowDifferenceFilters.isNewPropertyWithDefaultValue(difference, controllerFacade.getFlowManager())) {
+                continue;
+            }
+
+            if (FlowDifferenceFilters.isNewRelationshipAutoTerminatedAndDefaulted(difference, updatedSnapshot.getFlowContents(), controllerFacade.getFlowManager())) {
+                continue;
+            }
+
             final VersionedComponent localComponent = difference.getComponentA();
             if (localComponent == null) {
                 continue;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 97b8c5e..3adc531 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -82,6 +82,7 @@ import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.Snippet;
 import org.apache.nifi.controller.Template;
+import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.queue.DropFlowFileState;
 import org.apache.nifi.controller.queue.DropFlowFileStatus;
@@ -138,6 +139,7 @@ import org.apache.nifi.registry.flow.VersionControlInformation;
 import org.apache.nifi.registry.flow.VersionedComponent;
 import org.apache.nifi.registry.flow.VersionedFlowState;
 import org.apache.nifi.registry.flow.VersionedFlowStatus;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
 import org.apache.nifi.registry.flow.diff.DifferenceType;
 import org.apache.nifi.registry.flow.diff.FlowComparison;
 import org.apache.nifi.registry.flow.diff.FlowDifference;
@@ -2423,9 +2425,11 @@ public final class DtoFactory {
     }
 
 
-    public Set<ComponentDifferenceDTO> createComponentDifferenceDtos(final FlowComparison comparison) {
+    public Set<ComponentDifferenceDTO> createComponentDifferenceDtos(final FlowComparison comparison, final FlowManager flowManager) {
         final Map<ComponentDifferenceDTO, List<DifferenceDTO>> differencesByComponent = new HashMap<>();
 
+        final Map<String, VersionedProcessGroup> versionedGroups = flattenProcessGroups(comparison.getFlowA().getContents());
+
         for (final FlowDifference difference : comparison.getDifferences()) {
             // Ignore these as local differences for now because we can't do anything with it
             if (difference.getDifferenceType() == DifferenceType.BUNDLE_CHANGED) {
@@ -2446,6 +2450,15 @@ public final class DtoFactory {
                 continue;
             }
 
+            if (FlowDifferenceFilters.isNewPropertyWithDefaultValue(difference, flowManager)) {
+                continue;
+            }
+
+            final VersionedProcessGroup relevantProcessGroup = versionedGroups.get(difference.getComponentA().getGroupIdentifier());
+            if (relevantProcessGroup != null && FlowDifferenceFilters.isNewRelationshipAutoTerminatedAndDefaulted(difference, relevantProcessGroup, flowManager)) {
+                continue;
+            }
+
             final ComponentDifferenceDTO componentDiff = createComponentDifference(difference);
             final List<DifferenceDTO> differences = differencesByComponent.computeIfAbsent(componentDiff, key -> new ArrayList<>());
 
@@ -2463,6 +2476,20 @@ public final class DtoFactory {
         return differencesByComponent.keySet();
     }
 
+    private Map<String, VersionedProcessGroup> flattenProcessGroups(final VersionedProcessGroup group) {
+        final Map<String, VersionedProcessGroup> flattened = new HashMap<>();
+        flattenProcessGroups(group, flattened);
+        return flattened;
+    }
+
+    private void flattenProcessGroups(final VersionedProcessGroup group, final Map<String, VersionedProcessGroup> flattened) {
+        flattened.put(group.getIdentifier(), group);
+
+        for (final VersionedProcessGroup child : group.getProcessGroups()) {
+            flattenProcessGroups(child, flattened);
+        }
+    }
+
     private ComponentDifferenceDTO createComponentDifference(final FlowDifference difference) {
         VersionedComponent component = difference.getComponentA();
         if (component == null || difference.getComponentB() instanceof InstantiatedVersionedComponent) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index e560516..4504e0e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -180,6 +180,10 @@ public class ControllerFacade implements Authorizable {
         return flowController.getExtensionManager();
     }
 
+    public FlowManager getFlowManager() {
+        return flowController.getFlowManager();
+    }
+
     /**
      * Sets the name of this controller.
      *
diff --git a/pom.xml b/pom.xml
index 1df56cb..809bc2e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -96,7 +96,7 @@
         <ranger.version>1.0.0</ranger.version>
         <jetty.version>9.4.19.v20190610</jetty.version>
         <jackson.version>2.9.9</jackson.version>
-        <nifi.registry.version>0.4.0</nifi.registry.version>
+        <nifi.registry.version>0.5.0</nifi.registry.version>
         <nifi.groovy.version>2.5.4</nifi.groovy.version>
         <surefire.version>2.22.0</surefire.version>
     </properties>