You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/10/12 20:27:17 UTC
nifi git commit: NIFI-5695: Fixed bug that caused ports to not
properly map to their correct child group on Flow Import if the child group
is independently versioned
Repository: nifi
Updated Branches:
refs/heads/master 5eb5e96b1 -> 270ce8570
NIFI-5695: Fixed bug that caused ports to not properly map to their correct child group on Flow Import if the child group is independently versioned
This closes #3070.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/270ce857
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/270ce857
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/270ce857
Branch: refs/heads/master
Commit: 270ce8570df7a00a26f431d8d8ae0245b898bf69
Parents: 5eb5e96
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Oct 12 15:27:10 2018 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Fri Oct 12 16:26:59 2018 -0400
----------------------------------------------------------------------
.../nifi/groups/StandardProcessGroup.java | 36 ++++++++++++++
.../flow/mapping/NiFiRegistryFlowMapper.java | 51 +++++++++++---------
2 files changed, 63 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/270ce857/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 43578e4..a683a9e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -3933,6 +3933,24 @@ public final class StandardProcessGroup implements ProcessGroup {
return port.get();
}
+ // Attempt to locate child group by versioned component id
+ final Optional<ProcessGroup> optionalSpecifiedGroup = group.getProcessGroups().stream()
+ .filter(child -> child.getVersionedComponentId().isPresent())
+ .filter(child -> child.getVersionedComponentId().get().equals(connectableComponent.getGroupId()))
+ .findFirst();
+
+ if (optionalSpecifiedGroup.isPresent()) {
+ final ProcessGroup specifiedGroup = optionalSpecifiedGroup.get();
+ return specifiedGroup.getInputPorts().stream()
+ .filter(component -> component.getVersionedComponentId().isPresent())
+ .filter(component -> id.equals(component.getVersionedComponentId().get()))
+ .findAny()
+ .orElse(null);
+ }
+
+ // If no child group matched the versioned component id, then look at all child groups. This is done because
+ // in older versions, we did not properly map Versioned Component ID's to Ports' parent groups. As a result,
+ // if the flow doesn't contain the properly mapped group id, we need to search all child groups.
return group.getProcessGroups().stream()
.flatMap(gr -> gr.getInputPorts().stream())
.filter(component -> component.getVersionedComponentId().isPresent())
@@ -3950,6 +3968,24 @@ public final class StandardProcessGroup implements ProcessGroup {
return port.get();
}
+ // Attempt to locate child group by versioned component id
+ final Optional<ProcessGroup> optionalSpecifiedGroup = group.getProcessGroups().stream()
+ .filter(child -> child.getVersionedComponentId().isPresent())
+ .filter(child -> child.getVersionedComponentId().get().equals(connectableComponent.getGroupId()))
+ .findFirst();
+
+ if (optionalSpecifiedGroup.isPresent()) {
+ final ProcessGroup specifiedGroup = optionalSpecifiedGroup.get();
+ return specifiedGroup.getOutputPorts().stream()
+ .filter(component -> component.getVersionedComponentId().isPresent())
+ .filter(component -> id.equals(component.getVersionedComponentId().get()))
+ .findAny()
+ .orElse(null);
+ }
+
+ // If no child group matched the versioned component id, then look at all child groups. This is done because
+ // in older versions, we did not properly map Versioned Component ID's to Ports' parent groups. As a result,
+ // if the flow doesn't contain the properly mapped group id, we need to search all child groups.
return group.getProcessGroups().stream()
.flatMap(gr -> gr.getOutputPorts().stream())
.filter(component -> component.getVersionedComponentId().isPresent())
http://git-wip-us.apache.org/repos/asf/nifi/blob/270ce857/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
index 521b078..074302a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
@@ -71,6 +71,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -228,6 +229,20 @@ public class NiFiRegistryFlowMapper {
return versionedId;
}
+ private <E extends Exception> String getIdOrThrow(final Optional<String> currentVersionedId, final String componentId, final Supplier<E> exceptionSupplier) throws E {
+ if (currentVersionedId.isPresent()) {
+ return currentVersionedId.get();
+ } else {
+ final String resolved = versionedComponentIds.get(componentId);
+ if (resolved == null) {
+ throw exceptionSupplier.get();
+ }
+
+ return resolved;
+ }
+ }
+
+
private String getGroupId(final String groupId) {
return versionedComponentIds.get(groupId);
}
@@ -265,39 +280,27 @@ public class NiFiRegistryFlowMapper {
public ConnectableComponent mapConnectable(final Connectable connectable) {
final ConnectableComponent component = new InstantiatedConnectableComponent(connectable.getIdentifier(), connectable.getProcessGroupIdentifier());
- final Optional<String> versionedId = connectable.getVersionedComponentId();
- if (versionedId.isPresent()) {
- component.setId(versionedId.get());
- } else {
- final String resolved = versionedComponentIds.get(connectable.getIdentifier());
- if (resolved == null) {
- throw new IllegalArgumentException("Unable to map Connectable Component with identifier " + connectable.getIdentifier() + " to any version-controlled component");
- }
-
- component.setId(resolved);
- }
+ final String versionedId = getIdOrThrow(connectable.getVersionedComponentId(), connectable.getIdentifier(),
+ () -> new IllegalArgumentException("Unable to map Connectable Component with identifier " + connectable.getIdentifier() + " to any version-controlled component"));
+ component.setId(versionedId);
component.setComments(connectable.getComments());
+
+ final String groupId;
if (connectable instanceof RemoteGroupPort) {
final RemoteGroupPort port = (RemoteGroupPort) connectable;
final RemoteProcessGroup rpg = port.getRemoteProcessGroup();
final Optional<String> rpgVersionedId = rpg.getVersionedComponentId();
- final String groupId;
- if (rpgVersionedId.isPresent()) {
- groupId = rpgVersionedId.get();
- } else {
- final String resolved = versionedComponentIds.get(rpg.getIdentifier());
- if (resolved == null) {
- throw new IllegalArgumentException("Unable to find the Versioned Component ID for Remote Process Group that " + connectable + " belongs to");
- }
-
- groupId = resolved;
- }
+ groupId = getIdOrThrow(rpgVersionedId, rpg.getIdentifier(),
+ () -> new IllegalArgumentException("Unable to find the Versioned Component ID for Remote Process Group that " + connectable + " belongs to"));
- component.setGroupId(groupId);
} else {
- component.setGroupId(connectable.getProcessGroupIdentifier());
+ groupId = getIdOrThrow(connectable.getProcessGroup().getVersionedComponentId(), connectable.getProcessGroupIdentifier(),
+ () -> new IllegalArgumentException("Unable to find the Versioned Component ID for the Process Group that " + connectable + " belongs to"));
}
+
+ component.setGroupId(groupId);
+
component.setName(connectable.getName());
component.setType(ConnectableComponentType.valueOf(connectable.getConnectableType().name()));
return component;