You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:14:19 UTC

[31/50] nifi git commit: NIFI-4436, NIFI-4461: When copying and pasting an RPG, ensure that we copy Batch Settings for each Port. Bug fixes. Now works in clustered mode.

NIFI-4436, NIFI-4461: When copying and pasting an RPG, ensure that we copy Batch Settings for each Port. Bug fixes. Now works in clustered mode.

Signed-off-by: Matt Gilman <ma...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e1606701
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e1606701
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e1606701

Branch: refs/heads/master
Commit: e1606701c78984f794f96814d8f84a3f686099cf
Parents: c92022d
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Nov 22 09:55:30 2017 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Jan 8 12:44:54 2018 -0500

----------------------------------------------------------------------
 .../nifi/groups/StandardProcessGroup.java       | 128 +++++++++----------
 .../registry/flow/RestBasedFlowRegistry.java    |   1 +
 .../flow/mapping/NiFiRegistryDtoMapper.java     |   5 +-
 .../flow/mapping/NiFiRegistryFlowMapper.java    |  26 +++-
 .../mapping/StandardComparableDataFlow.java     |  42 ++++++
 .../nifi/remote/StandardRemoteProcessGroup.java |   1 +
 .../fingerprint/FingerprintFactoryTest.java     |   1 +
 .../nifi/web/StandardNiFiServiceFacade.java     |  27 +---
 .../org/apache/nifi/web/api/FlowResource.java   |  18 ---
 .../nifi/web/api/ProcessGroupResource.java      |   2 +-
 .../apache/nifi/web/api/VersionsResource.java   |   6 +-
 .../org/apache/nifi/web/api/dto/DtoFactory.java |   1 +
 12 files changed, 143 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e1606701/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 51839d0..77be3fe 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -21,6 +21,7 @@ import static java.util.Objects.requireNonNull;
 import java.io.IOException;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -93,12 +94,15 @@ import org.apache.nifi.processor.StandardProcessContext;
 import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.registry.VariableDescriptor;
 import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.BatchSize;
 import org.apache.nifi.registry.flow.Bundle;
+import org.apache.nifi.registry.flow.ComponentType;
 import org.apache.nifi.registry.flow.ConnectableComponent;
 import org.apache.nifi.registry.flow.FlowRegistry;
 import org.apache.nifi.registry.flow.FlowRegistryClient;
 import org.apache.nifi.registry.flow.StandardVersionControlInformation;
 import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedComponent;
 import org.apache.nifi.registry.flow.VersionedConnection;
 import org.apache.nifi.registry.flow.VersionedControllerService;
 import org.apache.nifi.registry.flow.VersionedFlow;
@@ -128,7 +132,6 @@ import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.scheduling.ExecutionNode;
 import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.util.ComponentIdGenerator;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.ReflectionUtils;
 import org.apache.nifi.web.Revision;
@@ -145,6 +148,7 @@ public final class StandardProcessGroup implements ProcessGroup {
     private final AtomicReference<String> comments;
     private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
     private final AtomicReference<StandardVersionControlInformation> versionControlInfo = new AtomicReference<>();
+    private static final SecureRandom randomGenerator = new SecureRandom();
 
     private final StandardProcessScheduler scheduler;
     private final ControllerServiceProvider controllerServiceProvider;
@@ -3018,10 +3022,20 @@ public final class StandardProcessGroup implements ProcessGroup {
             final FlowComparator flowComparator = new StandardFlowComparator(localFlow, remoteFlow, new StaticDifferenceDescriptor());
             final FlowComparison flowComparison = flowComparator.compare();
 
-            final Set<String> updatedVersionedComponentIds = flowComparison.getDifferences().stream()
-                .filter(diff -> diff.getDifferenceType() != DifferenceType.POSITION_CHANGED)
-                .map(diff -> diff.getComponentA() == null ? diff.getComponentB().getIdentifier() : diff.getComponentA().getIdentifier())
-                .collect(Collectors.toSet());
+            final Set<String> updatedVersionedComponentIds = new HashSet<>();
+            for (final FlowDifference diff : flowComparison.getDifferences()) {
+                if (diff.getDifferenceType() == DifferenceType.POSITION_CHANGED) {
+                    continue;
+                }
+
+                final VersionedComponent component = diff.getComponentA() == null ? diff.getComponentB() : diff.getComponentA();
+                updatedVersionedComponentIds.add(component.getIdentifier());
+
+                if (component.getComponentType() == ComponentType.REMOTE_INPUT_PORT || component.getComponentType() == ComponentType.REMOTE_OUTPUT_PORT) {
+                    final String remoteGroupId = ((VersionedRemoteGroupPort) component).getRemoteGroupId();
+                    updatedVersionedComponentIds.add(remoteGroupId);
+                }
+            }
 
             if (LOG.isInfoEnabled()) {
                 final String differencesByLine = flowComparison.getDifferences().stream()
@@ -3106,13 +3120,13 @@ public final class StandardProcessGroup implements ProcessGroup {
                 .registryId(registryId)
                 .registryName(registryName)
                 .bucketId(bucketId)
-                .bucketName(bucketId) // bucket name not yet known
+                .bucketName(bucketId)
                 .flowId(flowId)
-                .flowName(flowId) // flow id not yet known
+                .flowName(flowId)
                 .version(version)
                 .flowSnapshot(proposed)
                 .modified(false)
-                .current(true)
+                .current(remoteCoordinates.getLatest())
                 .build();
 
             group.setVersionControlInformation(vci, Collections.emptyMap());
@@ -3125,7 +3139,6 @@ public final class StandardProcessGroup implements ProcessGroup {
 
         for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) {
             final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
-
             final VersionedFlowCoordinates childCoordinates = proposedChildGroup.getVersionedFlowCoordinates();
 
             if (childGroup == null) {
@@ -3291,7 +3304,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 final RemoteProcessGroup added = addRemoteProcessGroup(group, proposedRpg, componentIdSeed);
                 LOG.info("Added {} to {}", added, this);
             } else if (updatedVersionedComponentIds.contains(proposedRpg.getIdentifier())) {
-                updateRemoteProcessGroup(rpg, proposedRpg);
+                updateRemoteProcessGroup(rpg, proposedRpg, componentIdSeed);
                 LOG.info("Updated {}", rpg);
             } else {
                 rpg.setPosition(new Position(proposedRpg.getPosition().getX(), proposedRpg.getPosition().getY()));
@@ -3388,27 +3401,28 @@ public final class StandardProcessGroup implements ProcessGroup {
         }
     }
 
-    protected String generateUuid(final String componentIdSeed) {
+    private String generateUuid(final String propposedId, final String destinationGroupId, final String seed) {
+        // TODO: I think we can get rid of all of those LinkedHashSet's now in the VersionedProcessGroup because
+        /// the UUID is properly keyed off of the ID of the component in the VersionedProcessGroup.
+        long msb = UUID.nameUUIDFromBytes((propposedId + destinationGroupId).getBytes(StandardCharsets.UTF_8)).getMostSignificantBits();
+
         UUID uuid;
-        if (componentIdSeed == null) {
-            uuid = ComponentIdGenerator.generateId();
+        if (StringUtils.isBlank(seed)) {
+            long lsb = randomGenerator.nextLong();
+            // since msb is extracted from type-one UUID, the type-one semantics will be preserved
+            uuid = new UUID(msb, lsb);
         } else {
-            try {
-                UUID seedId = UUID.fromString(componentIdSeed);
-                uuid = new UUID(seedId.getMostSignificantBits(), componentIdSeed.hashCode());
-            } catch (Exception e) {
-                LOG.warn("Provided 'seed' does not represent UUID. Will not be able to extract most significant bits for ID generation.");
-                uuid = UUID.nameUUIDFromBytes(componentIdSeed.getBytes(StandardCharsets.UTF_8));
-            }
+            UUID seedId = UUID.nameUUIDFromBytes((propposedId + destinationGroupId + seed).getBytes(StandardCharsets.UTF_8));
+            uuid = new UUID(msb, seedId.getLeastSignificantBits());
         }
-
+        LOG.debug("Generating UUID {} from currentId={}, seed={}", uuid, propposedId, seed);
         return uuid.toString();
     }
 
 
     private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final String componentIdSeed, final Set<String> variablesToSkip)
             throws ProcessorInstantiationException {
-        final ProcessGroup group = flowController.createProcessGroup(generateUuid(componentIdSeed));
+        final ProcessGroup group = flowController.createProcessGroup(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed));
         group.setVersionedComponentId(proposed.getIdentifier());
         group.setParent(destination);
         updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true, true, variablesToSkip);
@@ -3461,7 +3475,8 @@ public final class StandardProcessGroup implements ProcessGroup {
                 + " but no component could be found in the Process Group with a corresponding identifier");
         }
 
-        final Connection connection = flowController.createConnection(generateUuid(componentIdSeed), proposed.getName(), source, destination, proposed.getSelectedRelationships());
+        final Connection connection = flowController.createConnection(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName(), source, destination,
+            proposed.getSelectedRelationships());
         connection.setVersionedComponentId(proposed.getIdentifier());
         destinationGroup.addConnection(connection);
         updateConnection(connection, proposed);
@@ -3523,7 +3538,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 final String rpgId = connectableComponent.getGroupId();
                 final Optional<RemoteProcessGroup> rpgOption = group.getRemoteProcessGroups().stream()
                     .filter(component -> component.getVersionedComponentId().isPresent())
-                    .filter(component -> id.equals(component.getVersionedComponentId().get()))
+                    .filter(component -> rpgId.equals(component.getVersionedComponentId().get()))
                     .findAny();
 
                 if (!rpgOption.isPresent()) {
@@ -3598,7 +3613,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
     private ControllerServiceNode addControllerService(final ProcessGroup destination, final VersionedControllerService proposed, final String componentIdSeed) {
         final String type = proposed.getType();
-        final String id = generateUuid(componentIdSeed);
+        final String id = generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed);
 
         final Bundle bundle = proposed.getBundle();
         final BundleCoordinate coordinate = toCoordinate(bundle);
@@ -3619,7 +3634,7 @@ public final class StandardProcessGroup implements ProcessGroup {
     }
 
     private Funnel addFunnel(final ProcessGroup destination, final VersionedFunnel proposed, final String componentIdSeed) {
-        final Funnel funnel = flowController.createFunnel(generateUuid(componentIdSeed));
+        final Funnel funnel = flowController.createFunnel(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed));
         funnel.setVersionedComponentId(proposed.getIdentifier());
         destination.addFunnel(funnel);
         updateFunnel(funnel, proposed);
@@ -3634,7 +3649,7 @@ public final class StandardProcessGroup implements ProcessGroup {
     }
 
     private Port addInputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed) {
-        final Port port = flowController.createLocalInputPort(generateUuid(componentIdSeed), proposed.getName());
+        final Port port = flowController.createLocalInputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName());
         port.setVersionedComponentId(proposed.getIdentifier());
         destination.addInputPort(port);
         updatePort(port, proposed);
@@ -3643,7 +3658,7 @@ public final class StandardProcessGroup implements ProcessGroup {
     }
 
     private Port addOutputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed) {
-        final Port port = flowController.createLocalOutputPort(generateUuid(componentIdSeed), proposed.getName());
+        final Port port = flowController.createLocalOutputPort(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName());
         port.setVersionedComponentId(proposed.getIdentifier());
         destination.addOutputPort(port);
         updatePort(port, proposed);
@@ -3652,7 +3667,7 @@ public final class StandardProcessGroup implements ProcessGroup {
     }
 
     private Label addLabel(final ProcessGroup destination, final VersionedLabel proposed, final String componentIdSeed) {
-        final Label label = flowController.createLabel(generateUuid(componentIdSeed), proposed.getLabel());
+        final Label label = flowController.createLabel(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getLabel());
         label.setVersionedComponentId(proposed.getIdentifier());
         destination.addLabel(label);
         updateLabel(label, proposed);
@@ -3669,7 +3684,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
     private ProcessorNode addProcessor(final ProcessGroup destination, final VersionedProcessor proposed, final String componentIdSeed) throws ProcessorInstantiationException {
         final BundleCoordinate coordinate = toCoordinate(proposed.getBundle());
-        final ProcessorNode procNode = flowController.createProcessor(proposed.getType(), generateUuid(componentIdSeed), coordinate, true);
+        final ProcessorNode procNode = flowController.createProcessor(proposed.getType(), generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), coordinate, true);
         procNode.setVersionedComponentId(proposed.getIdentifier());
 
         destination.addProcessor(procNode);
@@ -3717,25 +3732,25 @@ public final class StandardProcessGroup implements ProcessGroup {
     }
 
     private RemoteProcessGroup addRemoteProcessGroup(final ProcessGroup destination, final VersionedRemoteProcessGroup proposed, final String componentIdSeed) {
-        final RemoteProcessGroup rpg = flowController.createRemoteProcessGroup(generateUuid(componentIdSeed), proposed.getTargetUris());
+        final RemoteProcessGroup rpg = flowController.createRemoteProcessGroup(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getTargetUris());
         rpg.setVersionedComponentId(proposed.getIdentifier());
 
         destination.addRemoteProcessGroup(rpg);
-        updateRemoteProcessGroup(rpg, proposed);
+        updateRemoteProcessGroup(rpg, proposed, componentIdSeed);
 
         return rpg;
     }
 
-    private void updateRemoteProcessGroup(final RemoteProcessGroup rpg, final VersionedRemoteProcessGroup proposed) {
+    private void updateRemoteProcessGroup(final RemoteProcessGroup rpg, final VersionedRemoteProcessGroup proposed, final String componentIdSeed) {
         rpg.setComments(proposed.getComments());
         rpg.setCommunicationsTimeout(proposed.getCommunicationsTimeout());
         rpg.setInputPorts(proposed.getInputPorts() == null ? Collections.emptySet() : proposed.getInputPorts().stream()
-            .map(port -> createPortDescriptor(port))
+            .map(port -> createPortDescriptor(port, componentIdSeed, rpg.getIdentifier()))
             .collect(Collectors.toSet()));
         rpg.setName(proposed.getName());
         rpg.setNetworkInterface(proposed.getLocalNetworkInterface());
         rpg.setOutputPorts(proposed.getOutputPorts() == null ? Collections.emptySet() : proposed.getOutputPorts().stream()
-            .map(port -> createPortDescriptor(port))
+            .map(port -> createPortDescriptor(port, componentIdSeed, rpg.getIdentifier()))
             .collect(Collectors.toSet()));
         rpg.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
         rpg.setProxyHost(proposed.getProxyHost());
@@ -3745,16 +3760,22 @@ public final class StandardProcessGroup implements ProcessGroup {
         rpg.setYieldDuration(proposed.getYieldDuration());
     }
 
-    private RemoteProcessGroupPortDescriptor createPortDescriptor(final VersionedRemoteGroupPort proposed) {
+    private RemoteProcessGroupPortDescriptor createPortDescriptor(final VersionedRemoteGroupPort proposed, final String componentIdSeed, final String rpgId) {
         final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
         descriptor.setVersionedComponentId(proposed.getIdentifier());
-        descriptor.setBatchCount(proposed.getBatchSize().getCount());
-        descriptor.setBatchDuration(proposed.getBatchSize().getDuration());
-        descriptor.setBatchSize(proposed.getBatchSize().getSize());
+
+        final BatchSize batchSize = proposed.getBatchSize();
+        if (batchSize != null) {
+            descriptor.setBatchCount(batchSize.getCount());
+            descriptor.setBatchDuration(batchSize.getDuration());
+            descriptor.setBatchSize(batchSize.getSize());
+        }
+
         descriptor.setComments(proposed.getComments());
         descriptor.setConcurrentlySchedulableTaskCount(proposed.getConcurrentlySchedulableTaskCount());
-        descriptor.setGroupId(proposed.getGroupId());
-        descriptor.setId(UUID.randomUUID().toString()); // TODO: Need to address this issue of port id's
+        descriptor.setGroupId(proposed.getRemoteGroupId());
+        descriptor.setTargetId(proposed.getTargetId());
+        descriptor.setId(generateUuid(proposed.getIdentifier(), rpgId, componentIdSeed));
         descriptor.setName(proposed.getName());
         descriptor.setUseCompression(proposed.isUseCompression());
         return descriptor;
@@ -3785,29 +3806,8 @@ public final class StandardProcessGroup implements ProcessGroup {
         final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
         final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient(), false);
 
-        final ComparableDataFlow currentFlow = new ComparableDataFlow() {
-            @Override
-            public VersionedProcessGroup getContents() {
-                return versionedGroup;
-            }
-
-            @Override
-            public String getName() {
-                return "Local Flow";
-            }
-        };
-
-        final ComparableDataFlow snapshotFlow = new ComparableDataFlow() {
-            @Override
-            public VersionedProcessGroup getContents() {
-                return vci.getFlowSnapshot();
-            }
-
-            @Override
-            public String getName() {
-                return "Versioned Flow";
-            }
-        };
+        final ComparableDataFlow currentFlow = new StandardComparableDataFlow("Local Flow", versionedGroup);
+        final ComparableDataFlow snapshotFlow = new StandardComparableDataFlow("Versioned Flow", vci.getFlowSnapshot());
 
         final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, new EvolvingDifferenceDescriptor());
         final FlowComparison comparison = flowComparator.compare();

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1606701/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
index 1d3eec6..1147b9e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/RestBasedFlowRegistry.java
@@ -231,6 +231,7 @@ public class RestBasedFlowRegistry implements FlowRegistry {
             group.setProcessors(contents.getProcessors());
             group.setRemoteProcessGroups(contents.getRemoteProcessGroups());
             group.setVariables(contents.getVariables());
+            coordinates.setLatest(snapshot.isLatest());
         }
 
         for (final VersionedProcessGroup child : group.getProcessGroups()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1606701/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java
index c3c1037..193bde8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java
@@ -309,10 +309,11 @@ public class NiFiRegistryDtoMapper {
         port.setGroupIdentifier(getGroupId(dto.getGroupId()));
         port.setComments(dto.getComments());
         port.setConcurrentlySchedulableTaskCount(dto.getConcurrentlySchedulableTaskCount());
-        port.setGroupId(dto.getGroupId());
+        port.setRemoteGroupId(dto.getGroupId());
         port.setName(dto.getName());
         port.setUseCompression(dto.getUseCompression());
-        port.setBatchSettings(mapBatchSettings(dto.getBatchSettings()));
+        port.setBatchSize(mapBatchSettings(dto.getBatchSettings()));
+        port.setTargetId(dto.getTargetId());
         port.setComponentType(componentType);
         return port;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1606701/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
index a10a1b8..7bab76d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
@@ -310,7 +310,26 @@ public class NiFiRegistryFlowMapper {
         }
 
         component.setComments(connectable.getComments());
-        component.setGroupId(connectable.getProcessGroupIdentifier());
+        if (connectable instanceof RemoteGroupPort) {
+            final RemoteGroupPort port = (RemoteGroupPort) connectable;
+            final RemoteProcessGroup rpg = port.getRemoteProcessGroup();
+            final Optional<String> rpgVersionedId = rpg.getVersionedComponentId();
+            final String groupId;
+            if (rpgVersionedId.isPresent()) {
+                groupId = rpgVersionedId.get();
+            } else {
+                final String resolved = versionedComponentIds.get(rpg.getIdentifier());
+                if (resolved == null) {
+                    throw new IllegalArgumentException("Unable to find the Versioned Component ID for Remote Process Group that " + connectable + " belongs to");
+                }
+
+                groupId = resolved;
+            }
+
+            component.setGroupId(groupId);
+        } else {
+            component.setGroupId(connectable.getProcessGroupIdentifier());
+        }
         component.setName(connectable.getName());
         component.setType(ConnectableComponentType.valueOf(connectable.getConnectableType().name()));
         return component;
@@ -478,10 +497,11 @@ public class NiFiRegistryFlowMapper {
         port.setGroupIdentifier(getGroupId(remotePort.getRemoteProcessGroup().getIdentifier()));
         port.setComments(remotePort.getComments());
         port.setConcurrentlySchedulableTaskCount(remotePort.getMaxConcurrentTasks());
-        port.setGroupId(remotePort.getProcessGroupIdentifier());
+        port.setRemoteGroupId(getGroupId(remotePort.getRemoteProcessGroup().getIdentifier()));
         port.setName(remotePort.getName());
         port.setUseCompression(remotePort.isUseCompression());
-        port.setBatchSettings(mapBatchSettings(remotePort));
+        port.setBatchSize(mapBatchSettings(remotePort));
+        port.setTargetId(remotePort.getTargetIdentifier());
         port.setComponentType(componentType);
         return port;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1606701/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/StandardComparableDataFlow.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/StandardComparableDataFlow.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/StandardComparableDataFlow.java
new file mode 100644
index 0000000..fe92f91
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/StandardComparableDataFlow.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow.mapping;
+
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
+
+public class StandardComparableDataFlow implements ComparableDataFlow {
+    private final String name;
+    private final VersionedProcessGroup contents;
+
+    public StandardComparableDataFlow(final String name, final VersionedProcessGroup contents) {
+        this.name = name;
+        this.contents = contents;
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public VersionedProcessGroup getContents() {
+        return contents;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1606701/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 726daa0..039ac66 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -780,6 +780,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
             if (!StringUtils.isBlank(descriptor.getBatchDuration())) {
                 port.setBatchDuration(descriptor.getBatchDuration());
             }
+            port.setVersionedComponentId(descriptor.getVersionedComponentId());
 
             inputPorts.put(descriptor.getId(), port);
             return port;

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1606701/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java
index 31f1fbe..30294a4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java
@@ -300,6 +300,7 @@ public class FingerprintFactoryTest {
         // Assert fingerprints with expected one.
         final String expected = "portId" +
                 "NO_VALUE" +
+                "NO_VALUE" +
                 "3" +
                 "true" +
                 "1234" +

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1606701/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index c66aebb..e0594fa 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -3756,29 +3756,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient, false);
         final VersionedProcessGroup registryGroup = versionedFlowSnapshot.getFlowContents();
 
-        final ComparableDataFlow localFlow = new ComparableDataFlow() {
-            @Override
-            public VersionedProcessGroup getContents() {
-                return localGroup;
-            }
-
-            @Override
-            public String getName() {
-                return "Local Flow";
-            }
-        };
-
-        final ComparableDataFlow registryFlow = new ComparableDataFlow() {
-            @Override
-            public VersionedProcessGroup getContents() {
-                return registryGroup;
-            }
-
-            @Override
-            public String getName() {
-                return "Versioned Flow";
-            }
-        };
+        final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup);
+        final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", registryGroup);
 
         final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, new ConciseEvolvingDifferenceDescriptor());
         final FlowComparison flowComparison = flowComparator.compare();
@@ -4037,7 +4016,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         }
 
         if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_PROCESS_GROUP.name())) {
-            return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId());
+            return authorizableLookup.getRemoteProcessGroup(componentId);
         }
 
         return null;

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1606701/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index 6bf4cca..0ae5864 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -604,24 +604,6 @@ public class FlowResource extends ApplicationResource {
                             componentIds.add(outputPort.getIdentifier());
                         });
 
-                // ensure authorized for each remote input port we will attempt to schedule
-                group.findAllRemoteProcessGroups().stream()
-                    .flatMap(rpg -> rpg.getInputPorts().stream())
-                    .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
-                    .filter(port -> port.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()))
-                    .forEach(port -> {
-                        componentIds.add(port.getIdentifier());
-                    });
-
-                // ensure authorized for each remote output port we will attempt to schedule
-                group.findAllRemoteProcessGroups().stream()
-                    .flatMap(rpg -> rpg.getOutputPorts().stream())
-                    .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
-                    .filter(port -> port.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()))
-                    .forEach(port -> {
-                        componentIds.add(port.getIdentifier());
-                    });
-
                 return componentIds;
             });
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1606701/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 7262a82..3fa4462 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -1641,7 +1641,7 @@ public class ProcessGroupResource extends ApplicationResource {
         // Step 6: Replicate the request or call serviceFacade.updateProcessGroup
 
         final VersionControlInformationDTO versionControlInfo = requestProcessGroupEntity.getComponent().getVersionControlInformation();
-        if (versionControlInfo != null) {
+        if (versionControlInfo != null && requestProcessGroupEntity.getVersionedFlowSnapshot() == null) {
             // Step 1: Ensure that user has write permissions to the Process Group. If not, then immediately fail.
             // Step 2: Retrieve flow from Flow Registry
             final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(versionControlInfo, true);

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1606701/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
index f2a207e..6e61182 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
@@ -709,7 +709,7 @@ public class VersionsResource extends ApplicationResource {
                 final VersionControlInformationDTO versionControlInfoDto = new VersionControlInformationDTO();
                 versionControlInfoDto.setBucketId(snapshotMetadata.getBucketIdentifier());
                 versionControlInfoDto.setBucketName(bucket.getName());
-                versionControlInfoDto.setCurrent(true);
+                versionControlInfoDto.setCurrent(snapshotMetadata.getVersion() == flow.getVersionCount());
                 versionControlInfoDto.setFlowId(snapshotMetadata.getFlowIdentifier());
                 versionControlInfoDto.setFlowName(flow.getName());
                 versionControlInfoDto.setFlowDescription(flow.getDescription());
@@ -1152,7 +1152,7 @@ public class VersionsResource extends ApplicationResource {
         final String idGenerationSeed = getIdGenerationSeed().orElse(null);
 
         // Step 0: Get the Versioned Flow Snapshot from the Flow Registry
-        final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), false);
+        final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), true);
 
         // The flow in the registry may not contain the same versions of components that we have in our flow. As a result, we need to update
         // the flow snapshot to contain compatible bundles.
@@ -1217,7 +1217,7 @@ public class VersionsResource extends ApplicationResource {
                 final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> {
                     try {
                         final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri,
-                            affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, false, false);
+                            affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, false, true);
 
                         vcur.markComplete(updatedVersionControlEntity);
                     } catch (final LifecycleManagementException e) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/e1606701/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 7d40473..fb60604 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -3428,6 +3428,7 @@ public final class DtoFactory {
             batchCopy.setCount(batchOrg.getCount());
             batchCopy.setSize(batchOrg.getSize());
             batchCopy.setDuration(batchOrg.getDuration());
+            copy.setBatchSettings(batchCopy);
         }
         return copy;
     }