You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/10/12 20:27:17 UTC

nifi git commit: NIFI-5695: Fixed bug that caused ports to not properly map to their correct child group on Flow Import if the child group is independently versioned

Repository: nifi
Updated Branches:
  refs/heads/master 5eb5e96b1 -> 270ce8570


NIFI-5695: Fixed bug that caused ports to not properly map to their correct child group on Flow Import if the child group is independently versioned

This closes #3070.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/270ce857
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/270ce857
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/270ce857

Branch: refs/heads/master
Commit: 270ce8570df7a00a26f431d8d8ae0245b898bf69
Parents: 5eb5e96
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Oct 12 15:27:10 2018 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Fri Oct 12 16:26:59 2018 -0400

----------------------------------------------------------------------
 .../nifi/groups/StandardProcessGroup.java       | 36 ++++++++++++++
 .../flow/mapping/NiFiRegistryFlowMapper.java    | 51 +++++++++++---------
 2 files changed, 63 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/270ce857/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 43578e4..a683a9e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -3933,6 +3933,24 @@ public final class StandardProcessGroup implements ProcessGroup {
                     return port.get();
                 }
 
+                // Attempt to locate child group by versioned component id
+                final Optional<ProcessGroup> optionalSpecifiedGroup = group.getProcessGroups().stream()
+                    .filter(child -> child.getVersionedComponentId().isPresent())
+                    .filter(child -> child.getVersionedComponentId().get().equals(connectableComponent.getGroupId()))
+                    .findFirst();
+
+                if (optionalSpecifiedGroup.isPresent()) {
+                    final ProcessGroup specifiedGroup = optionalSpecifiedGroup.get();
+                    return specifiedGroup.getInputPorts().stream()
+                        .filter(component -> component.getVersionedComponentId().isPresent())
+                        .filter(component -> id.equals(component.getVersionedComponentId().get()))
+                        .findAny()
+                        .orElse(null);
+                }
+
+                // If no child group matched the versioned component id, then look at all child groups. This is done because
+                // in older versions, we did not properly map Versioned Component ID's to Ports' parent groups. As a result,
+                // if the flow doesn't contain the properly mapped group id, we need to search all child groups.
                 return group.getProcessGroups().stream()
                     .flatMap(gr -> gr.getInputPorts().stream())
                     .filter(component -> component.getVersionedComponentId().isPresent())
@@ -3950,6 +3968,24 @@ public final class StandardProcessGroup implements ProcessGroup {
                     return port.get();
                 }
 
+                // Attempt to locate child group by versioned component id
+                final Optional<ProcessGroup> optionalSpecifiedGroup = group.getProcessGroups().stream()
+                    .filter(child -> child.getVersionedComponentId().isPresent())
+                    .filter(child -> child.getVersionedComponentId().get().equals(connectableComponent.getGroupId()))
+                    .findFirst();
+
+                if (optionalSpecifiedGroup.isPresent()) {
+                    final ProcessGroup specifiedGroup = optionalSpecifiedGroup.get();
+                    return specifiedGroup.getOutputPorts().stream()
+                        .filter(component -> component.getVersionedComponentId().isPresent())
+                        .filter(component -> id.equals(component.getVersionedComponentId().get()))
+                        .findAny()
+                        .orElse(null);
+                }
+
+                // If no child group matched the versioned component id, then look at all child groups. This is done because
+                // in older versions, we did not properly map Versioned Component ID's to Ports' parent groups. As a result,
+                // if the flow doesn't contain the properly mapped group id, we need to search all child groups.
                 return group.getProcessGroups().stream()
                     .flatMap(gr -> gr.getOutputPorts().stream())
                     .filter(component -> component.getVersionedComponentId().isPresent())

http://git-wip-us.apache.org/repos/asf/nifi/blob/270ce857/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
index 521b078..074302a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
@@ -71,6 +71,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 
@@ -228,6 +229,20 @@ public class NiFiRegistryFlowMapper {
         return versionedId;
     }
 
+    private <E extends Exception> String getIdOrThrow(final Optional<String> currentVersionedId, final String componentId, final Supplier<E> exceptionSupplier) throws E {
+        if (currentVersionedId.isPresent()) {
+            return currentVersionedId.get();
+        } else {
+            final String resolved = versionedComponentIds.get(componentId);
+            if (resolved == null) {
+                throw exceptionSupplier.get();
+            }
+
+            return resolved;
+        }
+    }
+
+
     private String getGroupId(final String groupId) {
         return versionedComponentIds.get(groupId);
     }
@@ -265,39 +280,27 @@ public class NiFiRegistryFlowMapper {
     public ConnectableComponent mapConnectable(final Connectable connectable) {
         final ConnectableComponent component = new InstantiatedConnectableComponent(connectable.getIdentifier(), connectable.getProcessGroupIdentifier());
 
-        final Optional<String> versionedId = connectable.getVersionedComponentId();
-        if (versionedId.isPresent()) {
-            component.setId(versionedId.get());
-        } else {
-            final String resolved = versionedComponentIds.get(connectable.getIdentifier());
-            if (resolved == null) {
-                throw new IllegalArgumentException("Unable to map Connectable Component with identifier " + connectable.getIdentifier() + " to any version-controlled component");
-            }
-
-            component.setId(resolved);
-        }
+        final String versionedId = getIdOrThrow(connectable.getVersionedComponentId(), connectable.getIdentifier(),
+            () -> new IllegalArgumentException("Unable to map Connectable Component with identifier " + connectable.getIdentifier() + " to any version-controlled component"));
+        component.setId(versionedId);
 
         component.setComments(connectable.getComments());
+
+        final String groupId;
         if (connectable instanceof RemoteGroupPort) {
             final RemoteGroupPort port = (RemoteGroupPort) connectable;
             final RemoteProcessGroup rpg = port.getRemoteProcessGroup();
             final Optional<String> rpgVersionedId = rpg.getVersionedComponentId();
-            final String groupId;
-            if (rpgVersionedId.isPresent()) {
-                groupId = rpgVersionedId.get();
-            } else {
-                final String resolved = versionedComponentIds.get(rpg.getIdentifier());
-                if (resolved == null) {
-                    throw new IllegalArgumentException("Unable to find the Versioned Component ID for Remote Process Group that " + connectable + " belongs to");
-                }
-
-                groupId = resolved;
-            }
+            groupId = getIdOrThrow(rpgVersionedId, rpg.getIdentifier(),
+                () -> new IllegalArgumentException("Unable to find the Versioned Component ID for Remote Process Group that " + connectable + " belongs to"));
 
-            component.setGroupId(groupId);
         } else {
-            component.setGroupId(connectable.getProcessGroupIdentifier());
+            groupId = getIdOrThrow(connectable.getProcessGroup().getVersionedComponentId(), connectable.getProcessGroupIdentifier(),
+                () -> new IllegalArgumentException("Unable to find the Versioned Component ID for the Process Group that " + connectable + " belongs to"));
         }
+
+        component.setGroupId(groupId);
+
         component.setName(connectable.getName());
         component.setType(ConnectableComponentType.valueOf(connectable.getConnectableType().name()));
         return component;