You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2019/08/28 14:39:19 UTC
[nifi] branch master updated: NIFI-6028: Updates to ignore changes
between local version of a flow and a remote version of a flow if the
difference is the addition of a new property that has the default value
This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 59734da NIFI-6028: Updates to ignore changes between local version of a flow and a remote version of a flow if the difference is the addition of a new property that has the default value
59734da is described below
commit 59734dad88303591c96b175ab81d6415c6b5e04c
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Mon Jun 17 13:26:29 2019 -0400
NIFI-6028: Updates to ignore changes between local version of a flow and a remote version of a flow if the difference is the addition of a new property that has the default value
NIFI-6028: Code refactoring to address review feedback
This closes #3544.
Signed-off-by: Bryan Bende <bb...@apache.org>
---
.../apache/nifi/groups/StandardProcessGroup.java | 4 +-
.../apache/nifi/util/FlowDifferenceFilters.java | 149 +++++++++++++++++++--
.../apache/nifi/web/StandardNiFiServiceFacade.java | 14 +-
.../org/apache/nifi/web/api/dto/DtoFactory.java | 29 +++-
.../nifi/web/controller/ControllerFacade.java | 4 +
pom.xml | 2 +-
6 files changed, 185 insertions(+), 17 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 52c5cbe..b3e2a7c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -3465,7 +3465,7 @@ public final class StandardProcessGroup implements ProcessGroup {
if (latestVersion == vci.getVersion()) {
versionControlFields.setStale(false);
if (latestVersion == 0) {
- LOG.debug("{} does not have any version in the Registry", this, latestVersion);
+ LOG.debug("{} does not have any version in the Registry", this);
versionControlFields.setLocallyModified(true);
} else {
LOG.debug("{} is currently at the most recent version ({}) of the flow that is under Version Control", this, latestVersion);
@@ -4725,6 +4725,8 @@ public final class StandardProcessGroup implements ProcessGroup {
.filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
.filter(FlowDifferenceFilters.FILTER_PUBLIC_PORT_NAME_CHANGES)
.filter(FlowDifferenceFilters.FILTER_IGNORABLE_VERSIONED_FLOW_COORDINATE_CHANGES)
+ .filter(diff -> !FlowDifferenceFilters.isNewPropertyWithDefaultValue(diff, flowManager))
+ .filter(diff -> !FlowDifferenceFilters.isNewRelationshipAutoTerminatedAndDefaulted(diff, versionedGroup, flowManager))
.collect(Collectors.toCollection(HashSet::new));
LOG.debug("There are {} differences between this Local Flow and the Versioned Flow: {}", differences.size(), differences);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
index c4c7403..d594f5f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
@@ -16,15 +16,29 @@
*/
package org.apache.nifi.util;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ComponentNode;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.flow.ComponentType;
import org.apache.nifi.registry.flow.VersionedComponent;
+import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
import org.apache.nifi.registry.flow.VersionedPort;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.flow.VersionedProcessor;
import org.apache.nifi.registry.flow.diff.DifferenceType;
import org.apache.nifi.registry.flow.diff.FlowDifference;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
import java.util.function.Predicate;
public class FlowDifferenceFilters {
@@ -32,9 +46,7 @@ public class FlowDifferenceFilters {
/**
* Predicate that returns true if the difference is NOT a name change on a public port (i.e. VersionedPort that allows remote access).
*/
- public static Predicate<FlowDifference> FILTER_PUBLIC_PORT_NAME_CHANGES = (fd) -> {
- return !isPublicPortNameChange(fd);
- };
+ public static Predicate<FlowDifference> FILTER_PUBLIC_PORT_NAME_CHANGES = (fd) -> !isPublicPortNameChange(fd);
public static boolean isPublicPortNameChange(final FlowDifference fd) {
final VersionedComponent versionedComponent = fd.getComponentA();
@@ -51,9 +63,7 @@ public class FlowDifferenceFilters {
/**
* Predicate that returns true if the difference is NOT a remote port being added, and false if it is.
*/
- public static Predicate<FlowDifference> FILTER_ADDED_REMOVED_REMOTE_PORTS = (fd) -> {
- return !isAddedOrRemovedRemotePort(fd);
- };
+ public static Predicate<FlowDifference> FILTER_ADDED_REMOVED_REMOTE_PORTS = (fd) -> !isAddedOrRemovedRemotePort(fd);
public static boolean isAddedOrRemovedRemotePort(final FlowDifference fd) {
if (fd.getDifferenceType() == DifferenceType.COMPONENT_ADDED || fd.getDifferenceType() == DifferenceType.COMPONENT_REMOVED) {
@@ -71,19 +81,14 @@ public class FlowDifferenceFilters {
return false;
}
- public static Predicate<FlowDifference> FILTER_IGNORABLE_VERSIONED_FLOW_COORDINATE_CHANGES = (fd) -> {
- return !isIgnorableVersionedFlowCoordinateChange(fd);
- };
+ public static Predicate<FlowDifference> FILTER_IGNORABLE_VERSIONED_FLOW_COORDINATE_CHANGES = (fd) -> !isIgnorableVersionedFlowCoordinateChange(fd);
public static boolean isIgnorableVersionedFlowCoordinateChange(final FlowDifference fd) {
if (fd.getDifferenceType() == DifferenceType.VERSIONED_FLOW_COORDINATES_CHANGED) {
final VersionedComponent componentA = fd.getComponentA();
final VersionedComponent componentB = fd.getComponentB();
- if (componentA != null && componentB != null
- && componentA instanceof VersionedProcessGroup
- && componentB instanceof VersionedProcessGroup) {
-
+ if (componentA instanceof VersionedProcessGroup && componentB instanceof VersionedProcessGroup) {
final VersionedProcessGroup versionedProcessGroupA = (VersionedProcessGroup) componentA;
final VersionedProcessGroup versionedProcessGroupB = (VersionedProcessGroup) componentB;
@@ -113,4 +118,122 @@ public class FlowDifferenceFilters {
return false;
}
+
+
+ public static boolean isNewPropertyWithDefaultValue(final FlowDifference fd, final FlowManager flowManager) {
+ if (fd.getDifferenceType() != DifferenceType.PROPERTY_ADDED) {
+ return false;
+ }
+
+ final VersionedComponent componentB = fd.getComponentB();
+
+ if (componentB instanceof InstantiatedVersionedProcessor) {
+ final InstantiatedVersionedProcessor instantiatedProcessor = (InstantiatedVersionedProcessor) componentB;
+ final ProcessorNode processorNode = flowManager.getProcessorNode(instantiatedProcessor.getInstanceId());
+ return isNewPropertyWithDefaultValue(fd, processorNode);
+ } else if (componentB instanceof InstantiatedVersionedControllerService) {
+ final InstantiatedVersionedControllerService instantiatedControllerService = (InstantiatedVersionedControllerService) componentB;
+ final ControllerServiceNode controllerService = flowManager.getControllerServiceNode(instantiatedControllerService.getInstanceId());
+ return isNewPropertyWithDefaultValue(fd, controllerService);
+ }
+
+ return false;
+ }
+
+ private static boolean isNewPropertyWithDefaultValue(final FlowDifference fd, final ComponentNode componentNode) {
+ if (componentNode == null) {
+ return false;
+ }
+
+ final Optional<String> optionalFieldName = fd.getFieldName();
+ if (!optionalFieldName.isPresent()) {
+ return false;
+ }
+
+ final String fieldName = optionalFieldName.get();
+ final PropertyDescriptor propertyDescriptor = componentNode.getPropertyDescriptor(fieldName);
+ if (propertyDescriptor == null) {
+ return false;
+ }
+
+ if (Objects.equals(fd.getValueB(), propertyDescriptor.getDefaultValue())) {
+ return true;
+ }
+
+ return false;
+ }
+
+
+ public static boolean isNewRelationshipAutoTerminatedAndDefaulted(final FlowDifference fd, final VersionedProcessGroup processGroup, final FlowManager flowManager) {
+ if (fd.getDifferenceType() != DifferenceType.AUTO_TERMINATED_RELATIONSHIPS_CHANGED) {
+ return false;
+ }
+
+ if (!(fd.getComponentA() instanceof VersionedProcessor) || !(fd.getComponentB() instanceof InstantiatedVersionedProcessor)) {
+ // Should not happen, since only processors have auto-terminated relationships.
+ return false;
+ }
+
+ final VersionedProcessor processorA = (VersionedProcessor) fd.getComponentA();
+ final VersionedProcessor processorB = (VersionedProcessor) fd.getComponentB();
+
+ // Determine if this Flow Difference indicates that Processor B has all of the same Auto-Terminated Relationships as Processor A, plus some.
+ // If that is the case, then it may be that a new Relationship was added, defaulting to 'Auto-Terminated' and that Processor B is still auto-terminated.
+ // We want to be able to identify that case.
+ final Set<String> autoTerminatedA = processorA.getAutoTerminatedRelationships();
+ final Set<String> autoTerminatedB = processorB.getAutoTerminatedRelationships();
+
+ // If B is smaller than A, then B cannot possibly contain all of A. So use that as a first comparison to avoid the expense of #containsAll
+ if (autoTerminatedB.size() < autoTerminatedA.size() || !autoTerminatedB.containsAll(autoTerminatedA)) {
+ // If B does not contain all of A, then the FlowDifference is indicative of some other change.
+ return false;
+ }
+
+ final InstantiatedVersionedProcessor instantiatedVersionedProcessor = (InstantiatedVersionedProcessor) processorB;
+ final ProcessorNode processorNode = flowManager.getProcessorNode(instantiatedVersionedProcessor.getInstanceId());
+ if (processorNode == null) {
+ return false;
+ }
+
+ final Set<String> newlyAddedAutoTerminated = new HashSet<>(autoTerminatedB);
+ newlyAddedAutoTerminated.removeAll(autoTerminatedA);
+
+ for (final String relationshipName : newlyAddedAutoTerminated) {
+ final Relationship relationship = processorNode.getRelationship(relationshipName);
+ if (relationship == null) {
+ return false;
+ }
+
+ final boolean defaultAutoTerminated = relationship.isAutoTerminated();
+ if (!defaultAutoTerminated) {
+ return false;
+ }
+
+ if (hasConnection(processGroup, processorA, relationshipName)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+
+ /**
+ * Determines whether or not the given Process Group has a Connection whose source is the given Processor and that contains the given relationship
+ *
+ * @param processGroup the process group
+ * @param processor the source processor
+ * @param relationship the relationship
+ *
+ * @return <code>true</code> if such a connection exists, <code>false</code> otherwise.
+ */
+ private static boolean hasConnection(final VersionedProcessGroup processGroup, final VersionedProcessor processor, final String relationship) {
+ for (final VersionedConnection connection : processGroup.getConnections()) {
+ if (connection.getSource().getId().equals(processor.getIdentifier()) && connection.getSelectedRelationships().contains(relationship)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 5ba1da6..da5ada2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -77,6 +77,7 @@ import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
+import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.repository.claim.ContentDirection;
@@ -4382,7 +4383,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, new ConciseEvolvingDifferenceDescriptor());
final FlowComparison flowComparison = flowComparator.compare();
- final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison);
+ final Set<ComponentDifferenceDTO> differenceDtos = dtoFactory.createComponentDifferenceDtos(flowComparison, controllerFacade.getFlowManager());
final FlowComparisonEntity entity = new FlowComparisonEntity();
entity.setComponentDifferences(differenceDtos);
@@ -4517,11 +4518,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, ancestorGroupServiceIds, new StaticDifferenceDescriptor());
final FlowComparison comparison = flowComparator.compare();
+ final FlowManager flowManager = controllerFacade.getFlowManager();
final Set<AffectedComponentEntity> affectedComponents = comparison.getDifferences().stream()
.filter(difference -> difference.getDifferenceType() != DifferenceType.COMPONENT_ADDED) // components that are added are not components that will be affected in the local flow.
.filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED)
.filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
.filter(FlowDifferenceFilters.FILTER_IGNORABLE_VERSIONED_FLOW_COORDINATE_CHANGES)
+ .filter(diff -> !FlowDifferenceFilters.isNewPropertyWithDefaultValue(diff, flowManager))
+ .filter(diff -> !FlowDifferenceFilters.isNewRelationshipAutoTerminatedAndDefaulted(diff, proposedFlow.getContents(), flowManager))
.map(difference -> {
final VersionedComponent localComponent = difference.getComponentA();
@@ -4572,6 +4576,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
continue;
}
+ if (FlowDifferenceFilters.isNewPropertyWithDefaultValue(difference, controllerFacade.getFlowManager())) {
+ continue;
+ }
+
+ if (FlowDifferenceFilters.isNewRelationshipAutoTerminatedAndDefaulted(difference, updatedSnapshot.getFlowContents(), controllerFacade.getFlowManager())) {
+ continue;
+ }
+
final VersionedComponent localComponent = difference.getComponentA();
if (localComponent == null) {
continue;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 97b8c5e..3adc531 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -82,6 +82,7 @@ import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
+import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
@@ -138,6 +139,7 @@ import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedComponent;
import org.apache.nifi.registry.flow.VersionedFlowState;
import org.apache.nifi.registry.flow.VersionedFlowStatus;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.flow.diff.DifferenceType;
import org.apache.nifi.registry.flow.diff.FlowComparison;
import org.apache.nifi.registry.flow.diff.FlowDifference;
@@ -2423,9 +2425,11 @@ public final class DtoFactory {
}
- public Set<ComponentDifferenceDTO> createComponentDifferenceDtos(final FlowComparison comparison) {
+ public Set<ComponentDifferenceDTO> createComponentDifferenceDtos(final FlowComparison comparison, final FlowManager flowManager) {
final Map<ComponentDifferenceDTO, List<DifferenceDTO>> differencesByComponent = new HashMap<>();
+ final Map<String, VersionedProcessGroup> versionedGroups = flattenProcessGroups(comparison.getFlowA().getContents());
+
for (final FlowDifference difference : comparison.getDifferences()) {
// Ignore these as local differences for now because we can't do anything with it
if (difference.getDifferenceType() == DifferenceType.BUNDLE_CHANGED) {
@@ -2446,6 +2450,15 @@ public final class DtoFactory {
continue;
}
+ if (FlowDifferenceFilters.isNewPropertyWithDefaultValue(difference, flowManager)) {
+ continue;
+ }
+
+ final VersionedProcessGroup relevantProcessGroup = versionedGroups.get(difference.getComponentA().getGroupIdentifier());
+ if (relevantProcessGroup != null && FlowDifferenceFilters.isNewRelationshipAutoTerminatedAndDefaulted(difference, relevantProcessGroup, flowManager)) {
+ continue;
+ }
+
final ComponentDifferenceDTO componentDiff = createComponentDifference(difference);
final List<DifferenceDTO> differences = differencesByComponent.computeIfAbsent(componentDiff, key -> new ArrayList<>());
@@ -2463,6 +2476,20 @@ public final class DtoFactory {
return differencesByComponent.keySet();
}
+ private Map<String, VersionedProcessGroup> flattenProcessGroups(final VersionedProcessGroup group) {
+ final Map<String, VersionedProcessGroup> flattened = new HashMap<>();
+ flattenProcessGroups(group, flattened);
+ return flattened;
+ }
+
+ private void flattenProcessGroups(final VersionedProcessGroup group, final Map<String, VersionedProcessGroup> flattened) {
+ flattened.put(group.getIdentifier(), group);
+
+ for (final VersionedProcessGroup child : group.getProcessGroups()) {
+ flattenProcessGroups(child, flattened);
+ }
+ }
+
private ComponentDifferenceDTO createComponentDifference(final FlowDifference difference) {
VersionedComponent component = difference.getComponentA();
if (component == null || difference.getComponentB() instanceof InstantiatedVersionedComponent) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index e560516..4504e0e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -180,6 +180,10 @@ public class ControllerFacade implements Authorizable {
return flowController.getExtensionManager();
}
+ public FlowManager getFlowManager() {
+ return flowController.getFlowManager();
+ }
+
/**
* Sets the name of this controller.
*
diff --git a/pom.xml b/pom.xml
index 1df56cb..809bc2e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -96,7 +96,7 @@
<ranger.version>1.0.0</ranger.version>
<jetty.version>9.4.19.v20190610</jetty.version>
<jackson.version>2.9.9</jackson.version>
- <nifi.registry.version>0.4.0</nifi.registry.version>
+ <nifi.registry.version>0.5.0</nifi.registry.version>
<nifi.groovy.version>2.5.4</nifi.groovy.version>
<surefire.version>2.22.0</surefire.version>
</properties>