You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2020/05/15 20:16:51 UTC
[nifi] branch master updated: NIFI-7460: Avoid NPE when a
VersionedProcessor has a null value for autoTerminatedRelationships. Added
additional logging and improved error handling around syncing with invalid
flows
This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new c51b905 NIFI-7460: Avoid NPE when a VersionedProcessor has a null value for autoTerminatedRelationships. Added additional logging and improved error handling around syncing with invalid flows
c51b905 is described below
commit c51b9051a8f503af1e1793c952b150895a0399ce
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri May 15 14:25:51 2020 -0400
NIFI-7460: Avoid NPE when a VersionedProcessor has a null value for autoTerminatedRelationships. Added additional logging and improved error handling around syncing with invalid flows
---
.../apache/nifi/groups/StandardProcessGroup.java | 79 ++++++++++++++--------
.../apache/nifi/reporting/StandardEventAccess.java | 18 ++++-
.../apache/nifi/util/FlowDifferenceFilters.java | 8 ++-
3 files changed, 72 insertions(+), 33 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index e7af89c..0c091d8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -374,7 +374,16 @@ public final class StandardProcessGroup implements ProcessGroup {
// update the vci counts for this child group
final VersionControlInformation vci = childGroup.getVersionControlInformation();
if (vci != null) {
- switch (vci.getStatus().getState()) {
+ final VersionedFlowStatus flowStatus;
+ try {
+ flowStatus = vci.getStatus();
+ } catch (final Exception e) {
+ LOG.warn("Could not determine Version Control State for {}. Will consider state to be SYNC_FAILURE", this, e);
+ syncFailure++;
+ continue;
+ }
+
+ switch (flowStatus.getState()) {
case LOCALLY_MODIFIED:
locallyModified++;
break;
@@ -1646,7 +1655,10 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public String toString() {
- return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("identifier", getIdentifier()).toString();
+ return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+ .append("identifier", getIdentifier())
+ .append("name", getName())
+ .toString();
}
@Override
@@ -3345,28 +3357,33 @@ public final class StandardProcessGroup implements ProcessGroup {
return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, syncFailureExplanation);
}
- final boolean modified = isModified();
- if (!modified) {
- final VersionControlInformation vci = StandardProcessGroup.this.versionControlInfo.get();
- if (vci.getFlowSnapshot() == null) {
- return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Process Group has not yet been synchronized with Flow Registry");
+ try {
+ final boolean modified = isModified();
+ if (!modified) {
+ final VersionControlInformation vci = StandardProcessGroup.this.versionControlInfo.get();
+ if (vci.getFlowSnapshot() == null) {
+ return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Process Group has not yet been synchronized with Flow Registry");
+ }
}
- }
- final boolean stale = versionControlFields.isStale();
+ final boolean stale = versionControlFields.isStale();
- final VersionedFlowState flowState;
- if (modified && stale) {
- flowState = VersionedFlowState.LOCALLY_MODIFIED_AND_STALE;
- } else if (modified) {
- flowState = VersionedFlowState.LOCALLY_MODIFIED;
- } else if (stale) {
- flowState = VersionedFlowState.STALE;
- } else {
- flowState = VersionedFlowState.UP_TO_DATE;
- }
+ final VersionedFlowState flowState;
+ if (modified && stale) {
+ flowState = VersionedFlowState.LOCALLY_MODIFIED_AND_STALE;
+ } else if (modified) {
+ flowState = VersionedFlowState.LOCALLY_MODIFIED;
+ } else if (stale) {
+ flowState = VersionedFlowState.STALE;
+ } else {
+ flowState = VersionedFlowState.UP_TO_DATE;
+ }
- return new StandardVersionedFlowStatus(flowState, flowState.getDescription());
+ return new StandardVersionedFlowStatus(flowState, flowState.getDescription());
+ } catch (final Exception e) {
+ LOG.warn("Could not correctly determine Versioned Flow Status for {}. Will consider state to be SYNC_FAILURE", this, e);
+ return new StandardVersionedFlowStatus(VersionedFlowState.SYNC_FAILURE, "Could not properly determine flow status due to: " + e);
+ }
}
};
@@ -4909,15 +4926,16 @@ public final class StandardProcessGroup implements ProcessGroup {
return null;
}
- final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(flowController.getExtensionManager());
- final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, controllerServiceProvider, flowController.getFlowRegistryClient(), false);
+ try {
+ final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(flowController.getExtensionManager());
+ final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, controllerServiceProvider, flowController.getFlowRegistryClient(), false);
- final ComparableDataFlow currentFlow = new StandardComparableDataFlow("Local Flow", versionedGroup);
- final ComparableDataFlow snapshotFlow = new StandardComparableDataFlow("Versioned Flow", vci.getFlowSnapshot());
+ final ComparableDataFlow currentFlow = new StandardComparableDataFlow("Local Flow", versionedGroup);
+ final ComparableDataFlow snapshotFlow = new StandardComparableDataFlow("Versioned Flow", vci.getFlowSnapshot());
- final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorServiceIds(), new EvolvingDifferenceDescriptor());
- final FlowComparison comparison = flowComparator.compare();
- final Set<FlowDifference> differences = comparison.getDifferences().stream()
+ final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorServiceIds(), new EvolvingDifferenceDescriptor());
+ final FlowComparison comparison = flowComparator.compare();
+ final Set<FlowDifference> differences = comparison.getDifferences().stream()
.filter(difference -> difference.getDifferenceType() != DifferenceType.BUNDLE_CHANGED)
.filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
.filter(FlowDifferenceFilters.FILTER_PUBLIC_PORT_NAME_CHANGES)
@@ -4927,8 +4945,11 @@ public final class StandardProcessGroup implements ProcessGroup {
.filter(diff -> !FlowDifferenceFilters.isScheduledStateNew(diff))
.collect(Collectors.toCollection(HashSet::new));
- LOG.debug("There are {} differences between this Local Flow and the Versioned Flow: {}", differences.size(), differences);
- return differences;
+ LOG.debug("There are {} differences between this Local Flow and the Versioned Flow: {}", differences.size(), differences);
+ return differences;
+ } catch (final RuntimeException e) {
+ throw new RuntimeException("Could not compute differences between local flow and Versioned Flow in NiFi Registry for " + this, e);
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
index 4432620..e30c00c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
@@ -62,10 +62,16 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedFlowState;
+import org.apache.nifi.registry.flow.VersionedFlowStatus;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.RemoteGroupPort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class StandardEventAccess implements UserAwareEventAccess {
+ private static final Logger logger = LoggerFactory.getLogger(StandardEventAccess.class);
+
private final FlowFileEventRepository flowFileEventRepository;
private final FlowController flowController;
private final StatusAnalyticsEngine statusAnalyticsEngine;
@@ -552,8 +558,16 @@ public class StandardEventAccess implements UserAwareEventAccess {
status.setBytesTransferred(bytesTransferred);
final VersionControlInformation vci = group.getVersionControlInformation();
- if (vci != null && vci.getStatus() != null && vci.getStatus().getState() != null) {
- status.setVersionedFlowState(vci.getStatus().getState());
+ if (vci != null) {
+ try {
+ final VersionedFlowStatus flowStatus = vci.getStatus();
+ if (flowStatus != null && flowStatus.getState() != null) {
+ status.setVersionedFlowState(flowStatus.getState());
+ }
+ } catch (final Exception e) {
+ logger.warn("Failed to determine Version Control State for {}. Will consider state to be SYNC_FAILURE", group, e);
+ status.setVersionedFlowState(VersionedFlowState.SYNC_FAILURE);
+ }
}
return status;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
index bc64d5e..ee1512c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java
@@ -36,6 +36,7 @@ import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
@@ -196,8 +197,8 @@ public class FlowDifferenceFilters {
// Determine if this Flow Difference indicates that Processor B has all of the same Auto-Terminated Relationships as Processor A, plus some.
// If that is the case, then it may be that a new Relationship was added, defaulting to 'Auto-Terminated' and that Processor B is still auto-terminated.
// We want to be able to identify that case.
- final Set<String> autoTerminatedA = processorA.getAutoTerminatedRelationships();
- final Set<String> autoTerminatedB = processorB.getAutoTerminatedRelationships();
+ final Set<String> autoTerminatedA = replaceNull(processorA.getAutoTerminatedRelationships(), Collections.emptySet());
+ final Set<String> autoTerminatedB = replaceNull(processorB.getAutoTerminatedRelationships(), Collections.emptySet());
// If B is smaller than A, then B cannot possibly contain all of A. So use that as a first comparison to avoid the expense of #containsAll
if (autoTerminatedB.size() < autoTerminatedA.size() || !autoTerminatedB.containsAll(autoTerminatedA)) {
@@ -233,6 +234,9 @@ public class FlowDifferenceFilters {
return true;
}
+ private static <T> T replaceNull(final T value, final T replacement) {
+ return value == null ? replacement : value;
+ }
/**
* Determines whether or not the given Process Group has a Connection whose source is the given Processor and that contains the given relationship