You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2020/05/15 20:16:51 UTC

[nifi] branch master updated: NIFI-7460: Avoid NPE when a VersionedProcessor has a null value for autoTerminatedRelationships. Added additional logging and improved error handling around syncing with invalid flows

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new c51b905  NIFI-7460: Avoid NPE when a VersionedProcessor has a null value for autoTerminatedRelationships. Added additional logging and improved error handling around syncing with invalid flows
c51b905 is described below

commit c51b9051a8f503af1e1793c952b150895a0399ce
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri May 15 14:25:51 2020 -0400

    NIFI-7460: Avoid NPE when a VersionedProcessor has a null value for autoTerminatedRelationships. Added additional logging and improved error handling around syncing with invalid flows
---
 .../apache/nifi/groups/StandardProcessGroup.java   | 79 ++++++++++++++--------
 .../apache/nifi/reporting/StandardEventAccess.java | 18 ++++-
 .../apache/nifi/util/FlowDifferenceFilters.java    |  8 ++-
 3 files changed, 72 insertions(+), 33 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index e7af89c..0c091d8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -374,7 +374,16 @@ public final class StandardProcessGroup implements ProcessGroup {
                 // update the vci counts for this child group
                 final VersionControlInformation vci = childGroup.getVersionControlInformation();
                 if (vci != null) {
-                    switch (vci.getStatus().getState()) {
+                    final VersionedFlowStatus flowStatus;
+                    try {
+                        flowStatus = vci.getStatus();
+                    } catch (final Exception e) {
+                        LOG.warn("Could not determine Version Control State for {}. Will consider state to be SYNC_FAILURE", this, e);
+                        syncFailure++;
+                        continue;
+                    }
+
+                    switch (flowStatus.getState()) {
                         case LOCALLY_MODIFIED:
                             locallyModified++;
                             break;
@@ -1646,7 +1655,10 @@ public final class StandardProcessGroup implements ProcessGroup {
 
     @Override
     public String toString() {
-        return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("identifier", getIdentifier()).toString();
+        return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+            .append("identifier", getIdentifier())
+            .append("name", getName())
+            .toString();
     }
 
     @Override
@@ -3345,28 +3357,33 @@ public final class StandardProcessGroup implements ProcessGroup {
                     return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, syncFailureExplanation);
                 }
 
-                final boolean modified = isModified();
-                if (!modified) {
-                    final VersionControlInformation vci = StandardProcessGroup.this.versionControlInfo.get();
-                    if (vci.getFlowSnapshot() == null) {
-                        return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Process Group has not yet been synchronized with Flow Registry");
+                try {
+                    final boolean modified = isModified();
+                    if (!modified) {
+                        final VersionControlInformation vci = StandardProcessGroup.this.versionControlInfo.get();
+                        if (vci.getFlowSnapshot() == null) {
+                            return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Process Group has not yet been synchronized with Flow Registry");
+                        }
                     }
-                }
 
-                final boolean stale = versionControlFields.isStale();
+                    final boolean stale = versionControlFields.isStale();
 
-                final VersionedFlowState flowState;
-                if (modified && stale) {
-                    flowState = VersionedFlowState.LOCALLY_MODIFIED_AND_STALE;
-                } else if (modified) {
-                    flowState = VersionedFlowState.LOCALLY_MODIFIED;
-                } else if (stale) {
-                    flowState = VersionedFlowState.STALE;
-                } else {
-                    flowState = VersionedFlowState.UP_TO_DATE;
-                }
+                    final VersionedFlowState flowState;
+                    if (modified && stale) {
+                        flowState = VersionedFlowState.LOCALLY_MODIFIED_AND_STALE;
+                    } else if (modified) {
+                        flowState = VersionedFlowState.LOCALLY_MODIFIED;
+                    } else if (stale) {
+                        flowState = VersionedFlowState.STALE;
+                    } else {
+                        flowState = VersionedFlowState.UP_TO_DATE;
+                    }
 
-                return new StandardVersionedFlowStatus(flowState, flowState.getDescription());
+                    return new StandardVersionedFlowStatus(flowState, flowState.getDescription());
+                } catch (final Exception e) {
+                    LOG.warn("Could not correctly determine Versioned Flow Status for {}. Will consider state to be SYNC_FAILURE", this, e);
+                    return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Could not properly determine flow status due to: " + e);
+                }
             }
         };
 
@@ -4909,15 +4926,16 @@ public final class StandardProcessGroup implements ProcessGroup {
             return null;
         }
 
-        final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(flowController.getExtensionManager());
-        final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, controllerServiceProvider, flowController.getFlowRegistryClient(), false);
+        try {
+            final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(flowController.getExtensionManager());
+            final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, controllerServiceProvider, flowController.getFlowRegistryClient(), false);
 
-        final ComparableDataFlow currentFlow = new StandardComparableDataFlow("Local Flow", versionedGroup);
-        final ComparableDataFlow snapshotFlow = new StandardComparableDataFlow("Versioned Flow", vci.getFlowSnapshot());
+            final ComparableDataFlow currentFlow = new StandardComparableDataFlow("Local Flow", versionedGroup);
+            final ComparableDataFlow snapshotFlow = new StandardComparableDataFlow("Versioned Flow", vci.getFlowSnapshot());
 
-        final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorServiceIds(), new EvolvingDifferenceDescriptor());
-        final FlowComparison comparison = flowComparator.compare();
-        final Set<FlowDifference> differences = comparison.getDifferences().stream()
+            final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorServiceIds(), new EvolvingDifferenceDescriptor());
+            final FlowComparison comparison = flowComparator.compare();
+            final Set<FlowDifference> differences = comparison.getDifferences().stream()
                 .filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED)
                 .filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
                 .filter(FlowDifferenceFilters.FILTER_PUBLIC_PORT_NAME_CHANGES)
@@ -4927,8 +4945,11 @@ public final class StandardProcessGroup implements ProcessGroup {
                 .filter(diff -> !FlowDifferenceFilters.isScheduledStateNew(diff))
                 .collect(Collectors.toCollection(HashSet::new));
 
-        LOG.debug("There are {} differences between this Local Flow and the Versioned Flow: {}", differences.size(), differences);
-        return differences;
+            LOG.debug("There are {} differences between this Local Flow and the Versioned Flow: {}", differences.size(), differences);
+            return differences;
+        } catch (final RuntimeException e) {
+            throw new RuntimeException("Could not compute differences between local flow and Versioned Flow in NiFi Registry for " + this, e);
+        }
     }
 
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
index 4432620..e30c00c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
@@ -62,10 +62,16 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceRepository;
 import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedFlowState;
+import org.apache.nifi.registry.flow.VersionedFlowStatus;
 import org.apache.nifi.remote.PublicPort;
 import org.apache.nifi.remote.RemoteGroupPort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class StandardEventAccess implements UserAwareEventAccess {
+    private static final Logger logger = LoggerFactory.getLogger(StandardEventAccess.class);
+
     private final FlowFileEventRepository flowFileEventRepository;
     private final FlowController flowController;
     private final StatusAnalyticsEngine statusAnalyticsEngine;
@@ -552,8 +558,16 @@ public class StandardEventAccess implements UserAwareEventAccess {
         status.setBytesTransferred(bytesTransferred);
 
         final VersionControlInformation vci = group.getVersionControlInformation();
-        if (vci != null && vci.getStatus() != null && vci.getStatus().getState() != null) {
-            status.setVersionedFlowState(vci.getStatus().getState());
+        if (vci != null) {
+            try {
+                final VersionedFlowStatus flowStatus = vci.getStatus();
+                if (flowStatus != null && flowStatus.getState() != null) {
+                    status.setVersionedFlowState(flowStatus.getState());
+                }
+            } catch (final Exception e) {
+                logger.warn("Failed to determine Version Control State for {}. Will consider state to be SYNC_FAILURE", group, e);
+                status.setVersionedFlowState(VersionedFlowState.SYNC_FAILURE);
+            }
         }
 
         return status;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
index bc64d5e..ee1512c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
@@ -36,6 +36,7 @@ import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
 import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Objects;
 import java.util.Optional;
@@ -196,8 +197,8 @@ public class FlowDifferenceFilters {
         // Determine if this Flow Difference indicates that Processor B has all of the same Auto-Terminated Relationships as Processor A, plus some.
         // If that is the case, then it may be that a new Relationship was added, defaulting to 'Auto-Terminated' and that Processor B is still auto-terminated.
         // We want to be able to identify that case.
-        final Set<String> autoTerminatedA = processorA.getAutoTerminatedRelationships();
-        final Set<String> autoTerminatedB = processorB.getAutoTerminatedRelationships();
+        final Set<String> autoTerminatedA = replaceNull(processorA.getAutoTerminatedRelationships(), Collections.emptySet());
+        final Set<String> autoTerminatedB = replaceNull(processorB.getAutoTerminatedRelationships(), Collections.emptySet());
 
         // If B is smaller than A, then B cannot possibly contain all of A. So use that as a first comparison to avoid the expense of #containsAll
         if (autoTerminatedB.size() < autoTerminatedA.size() || !autoTerminatedB.containsAll(autoTerminatedA)) {
@@ -233,6 +234,9 @@ public class FlowDifferenceFilters {
         return true;
     }
 
+    private static <T> T replaceNull(final T value, final T replacement) {
+        return value == null ? replacement : value;
+    }
 
     /**
      * Determines whether or not the given Process Group has a Connection whose source is the given Processor and that contains the given relationship