You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tp...@apache.org on 2021/06/01 12:38:34 UTC

[nifi] branch main updated: NIFI-8640 Regression with NIFI-8522 NiFi can duplicate controller service during template generation

This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f23dcb0  NIFI-8640 Regression with NIFI-8522 NiFi can duplicate controller service during template generation
f23dcb0 is described below

commit f23dcb05f6d6c869fab453634bf094a01b42d121
Author: Timea Barna <ti...@gmail.com>
AuthorDate: Mon May 31 12:52:40 2021 +0200

    NIFI-8640 Regression with NIFI-8522 NiFi can duplicate controller service during template generation
    
    This closes #5109.
    
    Signed-off-by: Tamas Palfy <ta...@gmail.com>
---
 .../org/apache/nifi/web/util/SnippetUtils.java     | 146 ++++++++++++++-------
 1 file changed, 97 insertions(+), 49 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
index bde2f95..5d3cfc6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java
@@ -75,6 +75,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 /**
  * Template utilities.
@@ -145,8 +146,9 @@ public final class SnippetUtils {
         // we are talking only about the DTO objects that make up the snippet. We do not actually modify the Process Group or the
         // Controller Services in our flow themselves!)
         final Set<ControllerServiceDTO> allServicesReferenced = new HashSet<>();
-        final Map<String, FlowSnippetDTO> contentsByGroup = new HashMap<>();
-        contentsByGroup.put(processGroup.getIdentifier(), snippetDto);
+        final Map<String, ProcessGroupDTO> contentsByGroup = new HashMap<>();
+        final ProcessGroupDTO highestProcessGroupDTO = dtoFactory.createProcessGroupDto(processGroup, recurse);
+        contentsByGroup.put(processGroup.getIdentifier(), highestProcessGroupDTO);
 
         // add any processors
         final Set<ControllerServiceDTO> controllerServices = new HashSet<>();
@@ -162,13 +164,13 @@ public final class SnippetUtils {
                 if (includeControllerServices) {
                     // Include all referenced services that are not already included in this snippet.
                     getControllerServices(processor.getEffectivePropertyValues()).stream()
-                            .filter(allServicesReferenced::add)
-                            .forEach(svc -> {
-                                final String svcGroupId = svc.getParentGroupId();
-                                final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : processGroup.getIdentifier();
-                                svc.setParentGroupId(destinationGroupId);
-                                controllerServices.add(svc);
-                            });
+                        .filter(allServicesReferenced::add)
+                        .forEach(svc -> {
+                            final String svcGroupId = svc.getParentGroupId();
+                            final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : processGroup.getIdentifier();
+                            svc.setParentGroupId(destinationGroupId);
+                            controllerServices.add(svc);
+                        });
                 }
             }
         }
@@ -234,42 +236,33 @@ public final class SnippetUtils {
         }
 
         // add any process groups
-        final ProcessGroupDTO highestProcessGroupDTO = dtoFactory.createProcessGroupDto(processGroup, recurse);
-        final Set<ProcessGroupDTO> processGroups = highestProcessGroupDTO.getContents().getProcessGroups();
-        fillContentsByGroupMap(highestProcessGroupDTO, contentsByGroup);
-
-        // Maintain a listing of visited groups starting with each group in the snippet.
-        // This is used to determine whether a referenced controller service should be included in the resulting snippet.
-        // If the service is defined at groupId or one of it's ancestors, its considered outside of this snippet
-        // and will only be included when the includeControllerServices is set to true.
-        // This happens above when considering the processors in this snippet.
-        final Set<String> visitedGroupIds = new HashSet<>();
-        for (final String groupIdentifier : contentsByGroup.keySet()) {
-
-            // Include this group in the ancestry for this snippet, services only get included if the includeControllerServices
-            // flag is set or if the service is defined within this groups hierarchy within the snippet.
-            if (!groupIdentifier.equals(processGroup.getIdentifier())) {
-                visitedGroupIds.add(groupIdentifier);
-            }
-
-            for (final ProcessorNode procNode : flowController.getFlowManager().getGroup(groupIdentifier).getProcessors()) {
-                // Include all referenced services that are not already included in this snippet.
-                getControllerServices(procNode.getEffectivePropertyValues()).stream()
-                        .filter(allServicesReferenced::add)
-                        .filter(svc -> includeControllerServices || visitedGroupIds.contains(svc.getParentGroupId()))
-                        .forEach(svc -> {
-                            final String svcGroupId = svc.getParentGroupId();
-                            final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : processGroup.getIdentifier();
-                            svc.setParentGroupId(destinationGroupId);
-                            final FlowSnippetDTO contents = contentsByGroup.get(destinationGroupId);
-                            Set<ControllerServiceDTO> services = contents.getControllerServices();
-                            if (services == null) {
-                                contents.setControllerServices(Collections.singleton(svc));
-                            } else {
-                                services.add(svc);
-                                contents.setControllerServices(services);
-                            }
-                        });
+        final Set<ProcessGroupDTO> processGroups = new LinkedHashSet<>();
+        if (!snippet.getProcessGroups().isEmpty()) {
+            Set<String> snippetGroupIds = snippet.getProcessGroups().keySet();
+
+            for (final ProcessGroupDTO group: highestProcessGroupDTO.getContents().getProcessGroups()) {
+                if (snippetGroupIds.contains(group.getId())) {
+                    contentsByGroup.put(group.getId(), group);
+                    addChildren(group, contentsByGroup);
+                }
+            }
+
+            for (final String childGroupId : snippet.getProcessGroups().keySet()) {
+                final ProcessGroup childGroup = processGroup.getProcessGroup(childGroupId);
+                if (childGroup == null) {
+                    throw new IllegalStateException("A process group in this snippet could not be found.");
+                }
+
+                ProcessGroupDTO childGroupDto = contentsByGroup.get(childGroupId);
+
+                // maintain a listing of visited groups starting with each group in the snippet. this is used to determine
+                // whether a referenced controller service should be included in the resulting snippet. if the service is
+                // defined at groupId or one of it's ancestors, its considered outside of this snippet and will only be included
+                // when the includeControllerServices is set to true. this happens above when considering the processors in this snippet
+                final Set<String> visitedGroupIds = new HashSet<>();
+                addControllerServices(childGroup, childGroupDto.getContents(), allServicesReferenced, includeControllerServices, visitedGroupIds, contentsByGroup, processGroup.getIdentifier());
+
+                processGroups.add(childGroupDto);
             }
         }
 
@@ -285,6 +278,7 @@ public final class SnippetUtils {
             }
         }
 
+
         // Normalize the coordinates based on the locations of the other components
         final List<? extends ComponentDTO> components = new ArrayList<>();
         components.addAll((Set) processors);
@@ -297,7 +291,7 @@ public final class SnippetUtils {
         components.addAll((Set) remoteProcessGroups);
         normalizeCoordinates(components);
 
-        Set<ControllerServiceDTO> updatedControllerServices = snippetDto.getControllerServices();
+        Set<ControllerServiceDTO> updatedControllerServices = contentsByGroup.get(processGroup.getIdentifier()).getContents().getControllerServices();
         if (updatedControllerServices == null) {
             updatedControllerServices = new HashSet<>();
         }
@@ -316,10 +310,63 @@ public final class SnippetUtils {
         return snippetDto;
     }
 
-    private void fillContentsByGroupMap(final ProcessGroupDTO processGroup, final Map<String, FlowSnippetDTO> contentByGroupMap) {
+    private void addChildren(final ProcessGroupDTO processGroup, final Map<String, ProcessGroupDTO> contentByGroupMap) {
         for (final ProcessGroupDTO group: processGroup.getContents().getProcessGroups()) {
-            contentByGroupMap.put(group.getId(), group.getContents());
-            fillContentsByGroupMap(group, contentByGroupMap);
+            contentByGroupMap.put(group.getId(), group);
+            addChildren(group, contentByGroupMap);
+        }
+    }
+
+    /**
+     * Finds all Controller Services that are referenced in the given Process Group (and child Process Groups, recursively), and
+     * adds them to the given servicesByGroup map
+     *
+     * @param group the Process Group to start from
+     * @param contents Process Group's contents
+     * @param allServicesReferenced a Set of all Controller Service DTO's that have already been referenced; used to dedupe services
+     * @param contentsByGroup a Map of Process Group ID to the Process Group's contents
+     * @param highestGroupId the UUID of the 'highest' process group in the snippet
+     */
+    private void addControllerServices(final ProcessGroup group, final FlowSnippetDTO contents, final Set<ControllerServiceDTO> allServicesReferenced,
+        final boolean includeControllerServices, final Set<String> visitedGroupIds, final Map<String, ProcessGroupDTO> contentsByGroup, final String highestGroupId) {
+
+        // include this group in the ancestry for this snippet, services only get included if the includeControllerServices
+        // flag is set or if the service is defined within this groups hierarchy within the snippet
+        visitedGroupIds.add(group.getIdentifier());
+
+        for (final ProcessorNode procNode : group.getProcessors()) {
+            // Include all referenced services that are not already included in this snippet.
+            getControllerServices(procNode.getEffectivePropertyValues()).stream()
+                .filter(allServicesReferenced::add)
+                .filter(svc -> includeControllerServices || visitedGroupIds.contains(svc.getParentGroupId()))
+                .forEach(svc -> {
+                    final String svcGroupId = svc.getParentGroupId();
+                    final String destinationGroupId = contentsByGroup.containsKey(svcGroupId) ? svcGroupId : highestGroupId;
+                    svc.setParentGroupId(destinationGroupId);
+                    final FlowSnippetDTO snippetDto = contentsByGroup.get(destinationGroupId).getContents();
+                    if (snippetDto != null) {
+                        Set<ControllerServiceDTO> services = snippetDto.getControllerServices();
+                        if (services == null) {
+                            snippetDto.setControllerServices(Collections.singleton(svc));
+                        } else {
+                            services.add(svc);
+                            snippetDto.setControllerServices(services);
+                        }
+                    }
+                });
+        }
+
+        // Map child process group ID to the child process group for easy lookup
+        final Map<String, ProcessGroupDTO> childGroupMap = contents.getProcessGroups().stream()
+            .collect(Collectors.toMap(ComponentDTO::getId, childGroupDto -> childGroupDto));
+
+        for (final ProcessGroup childGroup : group.getProcessGroups()) {
+            final ProcessGroupDTO childDto = childGroupMap.get(childGroup.getIdentifier());
+            if (childDto == null) {
+                continue;
+            }
+
+            addControllerServices(childGroup, childDto.getContents(), allServicesReferenced, includeControllerServices, visitedGroupIds, contentsByGroup, highestGroupId);
         }
     }
 
@@ -345,6 +392,7 @@ public final class SnippetUtils {
         return serviceDtos;
     }
 
+
     public FlowSnippetDTO copy(final FlowSnippetDTO snippetContents, final ProcessGroup group, final String idGenerationSeed, boolean isCopy) {
         final FlowSnippetDTO snippetCopy = copyContentsForGroup(snippetContents, group.getIdentifier(), null, null, idGenerationSeed, isCopy);
         resolveNameConflicts(snippetCopy, group);