You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tp...@apache.org on 2021/06/01 12:38:34 UTC
[nifi] branch main updated: NIFI-8640 Regression with NIFI-8522
NiFi can duplicate controller service during template generation
This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f23dcb0 NIFI-8640 Regression with NIFI-8522 NiFi can duplicate controller service during template generation
f23dcb0 is described below
commit f23dcb05f6d6c869fab453634bf094a01b42d121
Author: Timea Barna <ti...@gmail.com>
AuthorDate: Mon May 31 12:52:40 2021 +0200
NIFI-8640 Regression with NIFI-8522 NiFi can duplicate controller service during template generation
This closes #5109.
Signed-off-by: Tamas Palfy <ta...@gmail.com>
---
.../org/apache/nifi/web/util/SnippetUtils.java | 146 ++++++++++++++-------
1 file changed, 97 insertions(+), 49 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
index bde2f95..5d3cfc6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
@@ -75,6 +75,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
+import java.util.stream.Collectors;
/**
* Template utilities.
@@ -145,8 +146,9 @@ public final class SnippetUtils {
// we are talking only about the DTO objects that make up the snippet. We do not actually modify the Process Group or the
// Controller Services in our flow themselves!)
final Set<ControllerServiceDTO> allServicesReferenced = new HashSet<>();
- final Map<String, FlowSnippetDTO> contentsByGroup = new HashMap<>();
- contentsByGroup.put(processGroup.getIdentifier(), snippetDto);
+ final Map<String, ProcessGroupDTO> contentsByGroup = new HashMap<>();
+ final ProcessGroupDTO highestProcessGroupDTO = dtoFactory.createProcessGroupDto(processGroup, recurse);
+ contentsByGroup.put(processGroup.getIdentifier(), highestProcessGroupDTO);
// add any processors
final Set<ControllerServiceDTO> controllerServices = new HashSet<>();
@@ -162,13 +164,13 @@ public final class SnippetUtils {
if (includeControllerServices) {
// Include all referenced services that are not already included in this snippet.
getControllerServices(processor.getEffectivePropertyValues()).stream()
- .filter(allServicesReferenced::add)
- .forEach(svc -> {
- final String svcGroupId = svc.getParentGroupId();
- final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : processGroup.getIdentifier();
- svc.setParentGroupId(destinationGroupId);
- controllerServices.add(svc);
- });
+ .filter(allServicesReferenced::add)
+ .forEach(svc -> {
+ final String svcGroupId = svc.getParentGroupId();
+ final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : processGroup.getIdentifier();
+ svc.setParentGroupId(destinationGroupId);
+ controllerServices.add(svc);
+ });
}
}
}
@@ -234,42 +236,33 @@ public final class SnippetUtils {
}
// add any process groups
- final ProcessGroupDTO highestProcessGroupDTO = dtoFactory.createProcessGroupDto(processGroup, recurse);
- final Set<ProcessGroupDTO> processGroups = highestProcessGroupDTO.getContents().getProcessGroups();
- fillContentsByGroupMap(highestProcessGroupDTO, contentsByGroup);
-
- // Maintain a listing of visited groups starting with each group in the snippet.
- // This is used to determine whether a referenced controller service should be included in the resulting snippet.
- // If the service is defined at groupId or one of it's ancestors, its considered outside of this snippet
- // and will only be included when the includeControllerServices is set to true.
- // This happens above when considering the processors in this snippet.
- final Set<String> visitedGroupIds = new HashSet<>();
- for (final String groupIdentifier : contentsByGroup.keySet()) {
-
- // Include this group in the ancestry for this snippet, services only get included if the includeControllerServices
- // flag is set or if the service is defined within this groups hierarchy within the snippet.
- if (!groupIdentifier.equals(processGroup.getIdentifier())) {
- visitedGroupIds.add(groupIdentifier);
- }
-
- for (final ProcessorNode procNode : flowController.getFlowManager().getGroup(groupIdentifier).getProcessors()) {
- // Include all referenced services that are not already included in this snippet.
- getControllerServices(procNode.getEffectivePropertyValues()).stream()
- .filter(allServicesReferenced::add)
- .filter(svc -> includeControllerServices || visitedGroupIds.contains(svc.getParentGroupId()))
- .forEach(svc -> {
- final String svcGroupId = svc.getParentGroupId();
- final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : processGroup.getIdentifier();
- svc.setParentGroupId(destinationGroupId);
- final FlowSnippetDTO contents = contentsByGroup.get(destinationGroupId);
- Set<ControllerServiceDTO> services = contents.getControllerServices();
- if (services == null) {
- contents.setControllerServices(Collections.singleton(svc));
- } else {
- services.add(svc);
- contents.setControllerServices(services);
- }
- });
+ final Set<ProcessGroupDTO> processGroups = new LinkedHashSet<>();
+ if (!snippet.getProcessGroups().isEmpty()) {
+ Set<String> snippetGroupIds = snippet.getProcessGroups().keySet();
+
+ for (final ProcessGroupDTO group: highestProcessGroupDTO.getContents().getProcessGroups()) {
+ if (snippetGroupIds.contains(group.getId())) {
+ contentsByGroup.put(group.getId(), group);
+ addChildren(group, contentsByGroup);
+ }
+ }
+
+ for (final String childGroupId : snippet.getProcessGroups().keySet()) {
+ final ProcessGroup childGroup = processGroup.getProcessGroup(childGroupId);
+ if (childGroup == null) {
+ throw new IllegalStateException("A process group in this snippet could not be found.");
+ }
+
+ ProcessGroupDTO childGroupDto = contentsByGroup.get(childGroupId);
+
+ // maintain a listing of visited groups starting with each group in the snippet. this is used to determine
+ // whether a referenced controller service should be included in the resulting snippet. if the service is
+ // defined at groupId or one of it's ancestors, its considered outside of this snippet and will only be included
+ // when the includeControllerServices is set to true. this happens above when considering the processors in this snippet
+ final Set<String> visitedGroupIds = new HashSet<>();
+ addControllerServices(childGroup, childGroupDto.getContents(), allServicesReferenced, includeControllerServices, visitedGroupIds, contentsByGroup, processGroup.getIdentifier());
+
+ processGroups.add(childGroupDto);
}
}
@@ -285,6 +278,7 @@ public final class SnippetUtils {
}
}
+
// Normalize the coordinates based on the locations of the other components
final List<? extends ComponentDTO> components = new ArrayList<>();
components.addAll((Set) processors);
@@ -297,7 +291,7 @@ public final class SnippetUtils {
components.addAll((Set) remoteProcessGroups);
normalizeCoordinates(components);
- Set<ControllerServiceDTO> updatedControllerServices = snippetDto.getControllerServices();
+ Set<ControllerServiceDTO> updatedControllerServices = contentsByGroup.get(processGroup.getIdentifier()).getContents().getControllerServices();
if (updatedControllerServices == null) {
updatedControllerServices = new HashSet<>();
}
@@ -316,10 +310,63 @@ public final class SnippetUtils {
return snippetDto;
}
- private void fillContentsByGroupMap(final ProcessGroupDTO processGroup, final Map<String, FlowSnippetDTO> contentByGroupMap) {
+ private void addChildren(final ProcessGroupDTO processGroup, final Map<String, ProcessGroupDTO> contentByGroupMap) {
for (final ProcessGroupDTO group: processGroup.getContents().getProcessGroups()) {
- contentByGroupMap.put(group.getId(), group.getContents());
- fillContentsByGroupMap(group, contentByGroupMap);
+ contentByGroupMap.put(group.getId(), group);
+ addChildren(group, contentByGroupMap);
+ }
+ }
+
+ /**
+ * Finds all Controller Services that are referenced in the given Process Group (and child Process Groups, recursively), and
+ * adds them to the given servicesByGroup map
+ *
+ * @param group the Process Group to start from
+ * @param contents Process Group's contents
+ * @param allServicesReferenced a Set of all Controller Service DTO's that have already been referenced; used to dedupe services
+ * @param contentsByGroup a Map of Process Group ID to the Process Group's contents
+ * @param highestGroupId the UUID of the 'highest' process group in the snippet
+ */
+ private void addControllerServices(final ProcessGroup group, final FlowSnippetDTO contents, final Set<ControllerServiceDTO> allServicesReferenced,
+ final boolean includeControllerServices, final Set<String> visitedGroupIds, final Map<String, ProcessGroupDTO> contentsByGroup, final String highestGroupId) {
+
+ // include this group in the ancestry for this snippet, services only get included if the includeControllerServices
+ // flag is set or if the service is defined within this groups hierarchy within the snippet
+ visitedGroupIds.add(group.getIdentifier());
+
+ for (final ProcessorNode procNode : group.getProcessors()) {
+ // Include all referenced services that are not already included in this snippet.
+ getControllerServices(procNode.getEffectivePropertyValues()).stream()
+ .filter(allServicesReferenced::add)
+ .filter(svc -> includeControllerServices || visitedGroupIds.contains(svc.getParentGroupId()))
+ .forEach(svc -> {
+ final String svcGroupId = svc.getParentGroupId();
+ final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : highestGroupId;
+ svc.setParentGroupId(destinationGroupId);
+ final FlowSnippetDTO snippetDto = contentsByGroup.get(destinationGroupId).getContents();
+ if (snippetDto != null) {
+ Set<ControllerServiceDTO> services = snippetDto.getControllerServices();
+ if (services == null) {
+ snippetDto.setControllerServices(Collections.singleton(svc));
+ } else {
+ services.add(svc);
+ snippetDto.setControllerServices(services);
+ }
+ }
+ });
+ }
+
+ // Map child process group ID to the child process group for easy lookup
+ final Map<String, ProcessGroupDTO> childGroupMap = contents.getProcessGroups().stream()
+ .collect(Collectors.toMap(ComponentDTO::getId, childGroupDto -> childGroupDto));
+
+ for (final ProcessGroup childGroup : group.getProcessGroups()) {
+ final ProcessGroupDTO childDto = childGroupMap.get(childGroup.getIdentifier());
+ if (childDto == null) {
+ continue;
+ }
+
+ addControllerServices(childGroup, childDto.getContents(), allServicesReferenced, includeControllerServices, visitedGroupIds, contentsByGroup, highestGroupId);
}
}
@@ -345,6 +392,7 @@ public final class SnippetUtils {
return serviceDtos;
}
+
public FlowSnippetDTO copy(final FlowSnippetDTO snippetContents, final ProcessGroup group, final String idGenerationSeed, boolean isCopy) {
final FlowSnippetDTO snippetCopy = copyContentsForGroup(snippetContents, group.getIdentifier(), null, null, idGenerationSeed, isCopy);
resolveNameConflicts(snippetCopy, group);