You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/06/13 15:33:36 UTC
[nifi] 12/13: NIFI-10096 Corrected nested inherited Parameter Context loading
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch support/nifi-1.16
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 5113640b27423f0619a20f69a76f3ba1d085e498
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Thu Jun 9 15:54:30 2022 -0500
NIFI-10096 Corrected nested inherited Parameter Context loading
Signed-off-by: Joe Gresock <jg...@gmail.com>
This closes #6114.
---
.../serialization/VersionedFlowSynchronizer.java | 73 +++++++++++++++-------
1 file changed, 50 insertions(+), 23 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
index 757ada77a2..17a91b0793 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
@@ -111,6 +111,7 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
@@ -151,7 +152,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
// determine if the controller already had flow sync'd to it
final boolean flowAlreadySynchronized = controller.isFlowSynchronized();
- logger.info("Synching FlowController with proposed flow: Controller Already Synchronized = {}", flowAlreadySynchronized);
+ logger.info("Synchronizing FlowController with proposed flow: Controller Already Synchronized = {}", flowAlreadySynchronized);
// If bundle update strategy is configured to allow for compatible bundles, update any components to use compatible bundles if
// the exact bundle does not exist.
@@ -529,50 +530,71 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
}
private void inheritParameterContexts(final FlowController controller, final VersionedDataflow dataflow) {
- final ParameterContextManager parameterContextManager = controller.getFlowManager().getParameterContextManager();
-
- // Add any parameter context that doesn't yet exist. We have to add all contexts before updating them because
- // one context may reference another. We need that reference to exist before we try to create the reference.
- final Map<String, ParameterContext> parameterContextsByName = parameterContextManager.getParameterContextNameMapping();
-
controller.getFlowManager().withParameterContextResolution(() -> {
- for (final VersionedParameterContext versionedParameterContext : dataflow.getParameterContexts()) {
- inheritParameterContext(versionedParameterContext, controller.getFlowManager(), parameterContextsByName);
+ final List<VersionedParameterContext> parameterContexts = dataflow.getParameterContexts();
+
+ // Build mapping of name to context for resolution of inherited contexts
+ final Map<String, VersionedParameterContext> namedParameterContexts = parameterContexts.stream()
+ .collect(
+ Collectors.toMap(VersionedParameterContext::getName, Function.identity())
+ );
+ for (final VersionedParameterContext versionedParameterContext : parameterContexts) {
+ inheritParameterContext(versionedParameterContext, controller.getFlowManager(), namedParameterContexts);
}
});
}
- private void inheritParameterContext(final VersionedParameterContext versionedParameterContext, final FlowManager flowManager, final Map<String, ParameterContext> parameterContextsByName) {
- final ParameterContext existingContext = parameterContextsByName.get(versionedParameterContext.getName());
+ private void inheritParameterContext(
+ final VersionedParameterContext versionedParameterContext,
+ final FlowManager flowManager,
+ final Map<String, VersionedParameterContext> namedParameterContexts
+ ) {
+ final ParameterContextManager contextManager = flowManager.getParameterContextManager();
+ final ParameterContext existingContext = contextManager.getParameterContextNameMapping().get(versionedParameterContext.getName());
if (existingContext == null) {
- addParameterContext(versionedParameterContext, flowManager);
+ addParameterContext(versionedParameterContext, flowManager, namedParameterContexts);
} else {
- updateParameterContext(versionedParameterContext, existingContext, flowManager);
+ updateParameterContext(versionedParameterContext, existingContext, flowManager, namedParameterContexts);
}
}
- private void addParameterContext(final VersionedParameterContext versionedParameterContext, final FlowManager flowManager) {
+ private void addParameterContext(
+ final VersionedParameterContext versionedParameterContext,
+ final FlowManager flowManager,
+ final Map<String, VersionedParameterContext> namedParameterContexts
+ ) {
final Map<String, Parameter> parameters = createParameterMap(versionedParameterContext);
final ParameterContextManager contextManager = flowManager.getParameterContextManager();
- final List<String> referenceIds = findReferencedParameterContextIds(versionedParameterContext, contextManager);
+ final List<String> referenceIds = findReferencedParameterContextIds(versionedParameterContext, contextManager, namedParameterContexts);
flowManager.createParameterContext(versionedParameterContext.getInstanceIdentifier(), versionedParameterContext.getName(), parameters, referenceIds);
logger.info("Added Parameter Context {}", versionedParameterContext.getName());
}
- private List<String> findReferencedParameterContextIds(final VersionedParameterContext versionedParameterContext, final ParameterContextManager contextManager) {
+ private List<String> findReferencedParameterContextIds(
+ final VersionedParameterContext versionedParameterContext,
+ final ParameterContextManager contextManager,
+ final Map<String, VersionedParameterContext> namedParameterContexts
+ ) {
final List<String> referenceIds = new ArrayList<>();
final Map<String, ParameterContext> parameterContextsByName = contextManager.getParameterContextNameMapping();
if (versionedParameterContext.getInheritedParameterContexts() != null) {
for (final String inheritedContextName : versionedParameterContext.getInheritedParameterContexts()) {
- final ParameterContext existingContext = parameterContextsByName.get(inheritedContextName);
- if (existingContext == null) {
- logger.warn("Parameter Context {} inherits from Parameter Context {} but cannot find a Parameter Context with name {}",
- versionedParameterContext.getName(), inheritedContextName, inheritedContextName);
+ // Lookup inherited Parameter Context Name in Versioned Data Flow
+ final VersionedParameterContext inheritedParameterContext = namedParameterContexts.get(inheritedContextName);
+ if (inheritedParameterContext == null) {
+ // Lookup inherited Parameter Context Name in Parameter Context Manager
+ final ParameterContext existingContext = parameterContextsByName.get(inheritedContextName);
+ if (existingContext == null) {
+ logger.warn("Parameter Context {} inherits from Parameter Context {} but cannot find a Parameter Context with name {}",
+ versionedParameterContext.getName(), inheritedContextName, inheritedContextName);
+ } else {
+ referenceIds.add(existingContext.getIdentifier());
+ }
} else {
- referenceIds.add(existingContext.getIdentifier());
+ referenceIds.add(inheritedParameterContext.getInstanceIdentifier());
}
}
}
@@ -607,7 +629,12 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
return parameters;
}
- private void updateParameterContext(final VersionedParameterContext versionedParameterContext, final ParameterContext parameterContext, final FlowManager flowManager) {
+ private void updateParameterContext(
+ final VersionedParameterContext versionedParameterContext,
+ final ParameterContext parameterContext,
+ final FlowManager flowManager,
+ final Map<String, VersionedParameterContext> namedParameterContexts
+ ) {
final Map<String, Parameter> parameters = createParameterMap(versionedParameterContext);
final Map<String, String> currentValues = new HashMap<>();
@@ -648,7 +675,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
}
final ParameterContextManager contextManager = flowManager.getParameterContextManager();
- final List<String> inheritedContextIds = findReferencedParameterContextIds(versionedParameterContext, contextManager);
+ final List<String> inheritedContextIds = findReferencedParameterContextIds(versionedParameterContext, contextManager, namedParameterContexts);
final List<ParameterContext> referencedContexts = inheritedContextIds.stream()
.map(contextManager::getParameterContext)
.collect(Collectors.toList());