You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:14:00 UTC

[12/50] nifi git commit: NIFI-4436: - Initial checkpoint: able ot start version control and detect changes, in standalone mode, still 'crude' implementation - Checkpoint: Can place flow under version control and can determine if modified - Checkpoint: Ch

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java
index 1dc74ab..23f723e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java
@@ -36,6 +36,7 @@ import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.SystemBundle;
 import org.apache.nifi.provenance.MockProvenanceRepository;
 import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
 import org.apache.nifi.registry.variable.FileBasedVariableRegistry;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.util.NiFiProperties;
@@ -64,6 +65,7 @@ public class TestStandardReportingContext {
     private Bundle systemBundle;
     private BulletinRepository bulletinRepo;
     private VariableRegistry variableRegistry;
+    private FlowRegistryClient flowRegistry;
     private volatile String propsFile = TestStandardReportingContext.class.getResource("/flowcontrollertest.nifi.properties").getFile();
 
     @Before
@@ -120,9 +122,10 @@ public class TestStandardReportingContext {
 
         authorizer = new MockPolicyBasedAuthorizer(groups1, users1, policies1);
         variableRegistry = new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths());
+        flowRegistry = Mockito.mock(FlowRegistryClient.class);
 
         bulletinRepo = Mockito.mock(BulletinRepository.class);
-        controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer, auditService, encryptor, bulletinRepo, variableRegistry);
+        controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer, auditService, encryptor, bulletinRepo, variableRegistry, flowRegistry);
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
index b55e98d..1b54c64 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
@@ -46,6 +46,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.provenance.MockProvenanceRepository;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
 import org.apache.nifi.registry.variable.FileBasedVariableRegistry;
 import org.apache.nifi.util.NiFiProperties;
 import org.junit.After;
@@ -714,7 +715,8 @@ public class TestProcessorLifecycle {
 
         final FlowController flowController = FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), nifiProperties,
                 mock(Authorizer.class), mock(AuditService.class), null, new VolatileBulletinRepository(),
-                new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()));
+            new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()),
+            mock(FlowRegistryClient.class));
 
         return new FlowControllerAndSystemBundle(flowController, systemBundle);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
index 7a49103..8044ede 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
@@ -29,6 +29,7 @@ import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.SystemBundle;
 import org.apache.nifi.provenance.MockProvenanceRepository;
 import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
 import org.apache.nifi.registry.variable.FileBasedVariableRegistry;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.util.NiFiProperties;
@@ -80,7 +81,8 @@ public class StandardFlowSerializerTest {
         final VariableRegistry variableRegistry = new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths());
 
         final BulletinRepository bulletinRepo = Mockito.mock(BulletinRepository.class);
-        controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer, auditService, encryptor, bulletinRepo, variableRegistry);
+        controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer,
+            auditService, encryptor, bulletinRepo, variableRegistry, Mockito.mock(FlowRegistryClient.class));
 
         serializer = new StandardFlowSerializer(encryptor);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index a28eb34..27e1678 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
@@ -45,6 +46,9 @@ import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.ProcessGroupCounts;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
 import org.apache.nifi.registry.variable.MutableVariableRegistry;
 import org.apache.nifi.remote.RemoteGroupPort;
 
@@ -53,6 +57,7 @@ public class MockProcessGroup implements ProcessGroup {
     private final Map<String, ProcessorNode> processorMap = new HashMap<>();
     private final FlowController flowController;
     private final MutableVariableRegistry variableRegistry = new MutableVariableRegistry(VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY);
+    private VersionControlInformation versionControlInfo;
 
     public MockProcessGroup(final FlowController flowController) {
         this.flowController = flowController;
@@ -625,4 +630,35 @@ public class MockProcessGroup implements ProcessGroup {
     public Set<ConfiguredComponent> getComponentsAffectedByVariable(String variableName) {
         return Collections.emptySet();
     }
+
+    @Override
+    public Optional<String> getVersionedComponentId() {
+        return Optional.empty();
+    }
+
+    @Override
+    public void setVersionedComponentId(String versionedComponentId) {
+    }
+
+    @Override
+    public VersionControlInformation getVersionControlInformation() {
+        return versionControlInfo;
+    }
+
+    @Override
+    public void verifyCanUpdate(VersionedFlowSnapshot updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty) {
+    }
+
+    @Override
+    public void synchronizeWithFlowRegistry(FlowRegistryClient flowRegistry) {
+    }
+
+    @Override
+    public void updateFlow(VersionedFlowSnapshot proposedFlow, String componentIdSeed, boolean verifyNotDirty) {
+    }
+
+    @Override
+    public void setVersionControlInformation(VersionControlInformation versionControlInformation, Map<String, String> versionedComponentIds) {
+        this.versionControlInfo = versionControlInformation;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index faa8c0e..84e582c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -16,6 +16,14 @@
  */
 package org.apache.nifi.web;
 
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
 import org.apache.nifi.authorization.AuthorizeAccess;
 import org.apache.nifi.authorization.RequestAction;
 import org.apache.nifi.authorization.user.NiFiUser;
@@ -23,6 +31,11 @@ import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
 import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.UnknownResourceException;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
 import org.apache.nifi.web.api.dto.AccessPolicyDTO;
 import org.apache.nifi.web.api.dto.AffectedComponentDTO;
 import org.apache.nifi.web.api.dto.BulletinBoardDTO;
@@ -59,6 +72,7 @@ import org.apache.nifi.web.api.dto.TemplateDTO;
 import org.apache.nifi.web.api.dto.UserDTO;
 import org.apache.nifi.web.api.dto.UserGroupDTO;
 import org.apache.nifi.web.api.dto.VariableRegistryDTO;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
 import org.apache.nifi.web.api.dto.action.HistoryDTO;
 import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
@@ -101,13 +115,9 @@ import org.apache.nifi.web.api.entity.TemplateEntity;
 import org.apache.nifi.web.api.entity.UserEntity;
 import org.apache.nifi.web.api.entity.UserGroupEntity;
 import org.apache.nifi.web.api.entity.VariableRegistryEntity;
-
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
+import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
+import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
+import org.apache.nifi.web.api.entity.VersionedFlowEntity;
 
 /**
  * Defines the NiFiServiceFacade interface.
@@ -491,6 +501,14 @@ public interface NiFiServiceFacade {
     ProcessorEntity getProcessor(String id);
 
     /**
+     * Gets the Processor transfer object for the specified id, as it is visible to the given user
+     *
+     * @param id Id of the processor to return
+     * @return The Processor transfer object
+     */
+    ProcessorEntity getProcessor(String id, NiFiUser user);
+
+    /**
      * Gets the processor status.
      *
      * @param id id
@@ -1066,6 +1084,16 @@ public interface NiFiServiceFacade {
     RemoteProcessGroupEntity getRemoteProcessGroup(String remoteProcessGroupId);
 
     /**
+     * Gets a remote process group as it is visible to the given user
+     *
+     * @param remoteProcessGroupId The id of the remote process group
+     * @param user the user requesting the action
+     * @return group
+     */
+    RemoteProcessGroupEntity getRemoteProcessGroup(String remoteProcessGroupId, NiFiUser user);
+
+
+    /**
      * Gets all remote process groups in the a given parent group.
      *
      * @param groupId The id of the parent group
@@ -1074,6 +1102,15 @@ public interface NiFiServiceFacade {
     Set<RemoteProcessGroupEntity> getRemoteProcessGroups(String groupId);
 
     /**
+     * Gets all remote process groups in the a given parent group as they are visible to the given user
+     *
+     * @param groupId The id of the parent group
+     * @param user the user making the request
+     * @return group
+     */
+    Set<RemoteProcessGroupEntity> getRemoteProcessGroups(String groupId, NiFiUser user);
+
+    /**
      * Gets the remote process group status.
      *
      * @param id remote process group
@@ -1220,6 +1257,132 @@ public interface NiFiServiceFacade {
      */
     FunnelEntity deleteFunnel(Revision revision, String funnelId);
 
+
+    // ----------------------------------------
+    // Version Control methods
+    // ----------------------------------------
+
+    /**
+     * Returns the Version Control information for the Process Group with the given ID
+     *
+     * @param processGroupId the ID of the Process Group
+     * @return the Version Control information that corresponds to the given Process Group, or <code>null</code> if the
+     *         process group is not under version control
+     */
+    VersionControlInformationEntity getVersionControlInformation(String processGroupId);
+
+
+    /**
+     * Adds the given Versioned Flow to the registry specified by the given ID
+     *
+     * @param registryId the ID of the registry
+     * @param flow the flow to add to the registry
+     * @return a VersionedFlow that is fully populated, including identifiers
+     *
+     * @throws IOException if unable to communicate with the Flow Registry
+     */
+    VersionedFlow registerVersionedFlow(String registryId, VersionedFlow flow) throws IOException, UnknownResourceException;
+
+    /**
+     * Creates a snapshot of the Process Group with the given identifier, then creates a new Flow entity in the NiFi Registry
+     * and adds the snapshot of the Process Group as the first version of that flow.
+     *
+     * @param groupId the UUID of the Process Group
+     * @param requestEntity the details of the flow to create
+     * @return a VersionControlComponentMappingEntity that contains the information needed to notify a Process Group where it is tracking to and map
+     *         component ID's to their Versioned Component ID's
+     */
+    VersionControlComponentMappingEntity registerFlowWithFlowRegistry(String groupId, VersionedFlowEntity requestEntity);
+
+    /**
+     * Adds the given snapshot to the already existing Versioned Flow, which resides in the given Flow Registry with the given id
+     *
+     * @param registryId the ID of the Flow Registry to persist the snapshot to
+     * @param flow the flow where the snapshot should be persisted
+     * @param snapshot the Snapshot to persist
+     * @param comments about the snapshot
+     * @return the snapshot that represents what was stored in the registry
+     *
+     * @throws IOException if unable to communicate with the Flow Registry
+     */
+    VersionedFlowSnapshot registerVersionedFlowSnapshot(String registryId, VersionedFlow flow, VersionedProcessGroup snapshot, String comments) throws IOException, UnknownResourceException;
+
+    /**
+     * Updates the Version Control Information on the Process Group with the given ID
+     *
+     * @param processGroupRevision the Revision of the Process Group
+     * @param processGroupId the ID of the process group to update
+     * @param versionControlInfo the new Version Control Information
+     * @param versionedComponentMapping a mapping of component ID to Versioned Component ID
+     *
+     * @return a VersionControlInformationEntity that represents the newly updated Version Control information
+     */
+    VersionControlInformationEntity setVersionControlInformation(Revision processGroupRevision, String processGroupId, VersionControlInformationDTO versionControlInfo,
+        Map<String, String> versionedComponentMapping);
+
+
+    /**
+     * Retrieves the Versioned Flow Snapshot for the coordinates provided by the given Version Control Information DTO
+     *
+     * @param versionControlInfo the coordinates of the versioned flow
+     * @return the VersionedFlowSnapshot that corresponds to the given coordinates
+     *
+     * @throws ResourceNotFoundException if the Versioned Flow Snapshot could not be found
+     */
+    VersionedFlowSnapshot getVersionedFlowSnapshot(VersionControlInformationDTO versionControlInfo) throws IOException;
+
+    /**
+     * Determines which components currently exist in the Process Group with the given identifier and calculates which of those components
+     * would be impacted by updating the Process Group to the provided snapshot
+     *
+     * @param processGroupId the ID of the Process Group to update
+     * @param updatedSnapshot the snapshot to update the Process Group to
+     * @param user the user making the request
+     * @return the set of all components that would be affected by updating the Process Group
+     */
+    Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(String processGroupId, VersionedFlowSnapshot updatedSnapshot, NiFiUser user) throws IOException;
+
+    /**
+     * Verifies that the Process Group with the given identifier can be updated to the proposed flow
+     *
+     * @param groupId the ID of the Process Group to update
+     * @param proposedFlow the proposed flow
+     * @param verifyConnectionRemoval whether or not to verify that connections that no longer exist in the proposed flow are eligible for deletion
+     * @param verifyNotDirty whether or not to verify that the Process Group is not 'dirty'. If this value is <code>true</code>,
+     *            and the Process Group has been modified since it was last synchronized with the Flow Registry, then this method will
+     *            throw an IllegalStateException
+     */
+    void verifyCanUpdate(String groupId, VersionedFlowSnapshot proposedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty);
+
+    /**
+     * Updates the Process group with the given ID to match the new snapshot
+     *
+     * @param revision the revision of the Process Group
+     * @param groupId the ID of the Process Group
+     * @param versionControlInfo the Version Control information
+     * @param snapshot the new snapshot
+     * @param componentIdSeed the seed to use for generating new component ID's
+     * @return the Process Group
+     */
+    ProcessGroupEntity updateProcessGroup(Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed,
+        boolean verifyNotModified);
+
+    /**
+     * Updates the Process group with the given ID to match the new snapshot
+     *
+     * @param user the user performing the request
+     * @param revision the revision of the Process Group
+     * @param groupId the ID of the Process Group
+     * @param versionControlInfo the Version Control information
+     * @param snapshot the new snapshot
+     * @param componentIdSeed the seed to use for generating new component ID's
+     * @return the Process Group
+     */
+    ProcessGroupEntity updateProcessGroup(NiFiUser user, Revision revision, String groupId, VersionControlInformationDTO versionControlInfo, VersionedFlowSnapshot snapshot, String componentIdSeed,
+        boolean verifyNotModified);
+
+    void setFlowRegistryClient(FlowRegistryClient flowRegistryClient);
+
     // ----------------------------------------
     // Component state methods
     // ----------------------------------------
@@ -1290,6 +1453,7 @@ public interface NiFiServiceFacade {
      */
     void clearReportingTaskState(String reportingTaskId);
 
+
     // ----------------------------------------
     // Label methods
     // ----------------------------------------
@@ -1510,6 +1674,15 @@ public interface NiFiServiceFacade {
     ControllerServiceEntity getControllerService(String controllerServiceId);
 
     /**
+     * Gets the specified controller service as it is visible to the given user
+     *
+     * @param controllerServiceId id
+     * @param user the user making the request
+     * @return service
+     */
+    ControllerServiceEntity getControllerService(String controllerServiceId, NiFiUser user);
+
+    /**
      * Get the descriptor for the specified property of the specified controller service.
      *
      * @param id id

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java
index d0230db..f7f3b90 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java
@@ -138,6 +138,12 @@ public class NiFiServiceFacadeLock {
         return proceedWithReadLock(proceedingJoinPoint);
     }
 
+    @Around("within(org.apache.nifi.web.NiFiServiceFacade+) && "
+        + "execution(* register*(..))")
+    public Object registerLock(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
+        return proceedWithReadLock(proceedingJoinPoint);
+    }
+
 
     private Object proceedWithReadLock(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
         final long beforeLock = System.nanoTime();

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 9caabd6..d3a5fd0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -16,7 +16,32 @@
  */
 package org.apache.nifi.web;
 
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+
 import org.apache.nifi.action.Action;
 import org.apache.nifi.action.Component;
 import org.apache.nifi.action.FlowChangeAction;
@@ -85,6 +110,31 @@ import org.apache.nifi.history.History;
 import org.apache.nifi.history.HistoryQuery;
 import org.apache.nifi.history.PreviousValue;
 import org.apache.nifi.registry.ComponentVariableRegistry;
+import org.apache.nifi.registry.flow.ConnectableComponent;
+import org.apache.nifi.registry.flow.FlowRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.RemoteFlowCoordinates;
+import org.apache.nifi.registry.flow.UnknownResourceException;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedComponent;
+import org.apache.nifi.registry.flow.VersionedConnection;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.DifferenceType;
+import org.apache.nifi.registry.flow.diff.FlowComparator;
+import org.apache.nifi.registry.flow.diff.FlowComparison;
+import org.apache.nifi.registry.flow.diff.FlowDifference;
+import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
+import org.apache.nifi.registry.flow.mapping.InstantiatedConnectableComponent;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedRemoteGroupPort;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
 import org.apache.nifi.remote.RootGroupPort;
 import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinQuery;
@@ -140,6 +190,8 @@ import org.apache.nifi.web.api.dto.TemplateDTO;
 import org.apache.nifi.web.api.dto.UserDTO;
 import org.apache.nifi.web.api.dto.UserGroupDTO;
 import org.apache.nifi.web.api.dto.VariableRegistryDTO;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
+import org.apache.nifi.web.api.dto.VersionedFlowDTO;
 import org.apache.nifi.web.api.dto.action.HistoryDTO;
 import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
 import org.apache.nifi.web.api.dto.flow.FlowDTO;
@@ -197,6 +249,9 @@ import org.apache.nifi.web.api.entity.UserEntity;
 import org.apache.nifi.web.api.entity.UserGroupEntity;
 import org.apache.nifi.web.api.entity.VariableEntity;
 import org.apache.nifi.web.api.entity.VariableRegistryEntity;
+import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
+import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
+import org.apache.nifi.web.api.entity.VersionedFlowEntity;
 import org.apache.nifi.web.controller.ControllerFacade;
 import org.apache.nifi.web.dao.AccessPolicyDAO;
 import org.apache.nifi.web.dao.ConnectionDAO;
@@ -224,29 +279,7 @@ import org.apache.nifi.web.util.SnippetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
+import com.google.common.collect.Sets;
 
 /**
  * Implementation of NiFiServiceFacade that performs revision checking.
@@ -285,6 +318,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     // administrative services
     private AuditService auditService;
 
+    // flow registry
+    private FlowRegistryClient flowRegistryClient;
+
     // properties
     private NiFiProperties properties;
     private DtoFactory dtoFactory;
@@ -925,7 +961,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
                     @Override
                     public RevisionUpdate<ScheduleComponentsEntity> update() {
                         // schedule the components
-                processGroupDAO.scheduleComponents(processGroupId, state, componentRevisions.keySet());
+                        processGroupDAO.scheduleComponents(processGroupId, state, componentRevisions.keySet());
 
                         // update the revisions
                         final Map<String, Revision> updatedRevisions = new HashMap<>();
@@ -950,7 +986,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
 
     @Override
     public ActivateControllerServicesEntity activateControllerServices(final String processGroupId, final ControllerServiceState state, final Map<String, Revision> serviceRevisions) {
-
         final NiFiUser user = NiFiUserUtils.getNiFiUser();
         return activateControllerServices(user, processGroupId, state, serviceRevisions);
     }
@@ -1010,6 +1045,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         return entityFactory.createControllerConfigurationEntity(updatedComponent.getComponent(), updateRevision, permissions);
     }
 
+
     @Override
     public NodeDTO updateNode(final NodeDTO nodeDTO) {
         final NiFiUser user = NiFiUserUtils.getNiFiUser();
@@ -2512,9 +2548,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         return entityFactory.createStatusHistoryEntity(dto, permissions);
     }
 
-    private ProcessorEntity createProcessorEntity(final ProcessorNode processor) {
+    private ProcessorEntity createProcessorEntity(final ProcessorNode processor, final NiFiUser user) {
         final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processor.getIdentifier()));
-        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor);
+        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor, user);
         final ProcessorStatusDTO status = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier()));
         final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processor.getIdentifier()));
         final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
@@ -2524,8 +2560,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     @Override
     public Set<ProcessorEntity> getProcessors(final String groupId, final boolean includeDescendants) {
         final Set<ProcessorNode> processors = processorDAO.getProcessors(groupId, includeDescendants);
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
         return processors.stream()
-            .map(processor -> createProcessorEntity(processor))
+            .map(processor -> createProcessorEntity(processor, user))
             .collect(Collectors.toSet());
     }
 
@@ -2582,8 +2619,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
 
     @Override
     public ProcessorEntity getProcessor(final String id) {
+        return getProcessor(id, NiFiUserUtils.getNiFiUser());
+    }
+
+    @Override
+    public ProcessorEntity getProcessor(final String id, final NiFiUser user) {
         final ProcessorNode processor = processorDAO.getProcessor(id);
-        return createProcessorEntity(processor);
+        return createProcessorEntity(processor, user);
     }
 
     @Override
@@ -3103,9 +3145,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
             .collect(Collectors.toSet());
     }
 
-    private RemoteProcessGroupEntity createRemoteGroupEntity(final RemoteProcessGroup rpg) {
+    private RemoteProcessGroupEntity createRemoteGroupEntity(final RemoteProcessGroup rpg, final NiFiUser user) {
         final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(rpg.getIdentifier()));
-        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(rpg);
+        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(rpg, user);
         final RemoteProcessGroupStatusDTO status = dtoFactory.createRemoteProcessGroupStatusDto(controllerFacade.getRemoteProcessGroupStatus(rpg.getIdentifier()));
         final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(rpg.getIdentifier()));
         final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
@@ -3114,9 +3156,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
 
     @Override
     public Set<RemoteProcessGroupEntity> getRemoteProcessGroups(final String groupId) {
+        return getRemoteProcessGroups(groupId, NiFiUserUtils.getNiFiUser());
+    }
+
+    @Override
+    public Set<RemoteProcessGroupEntity> getRemoteProcessGroups(final String groupId, final NiFiUser user) {
         final Set<RemoteProcessGroup> rpgs = remoteProcessGroupDAO.getRemoteProcessGroups(groupId);
         return rpgs.stream()
-            .map(rpg -> createRemoteGroupEntity(rpg))
+            .map(rpg -> createRemoteGroupEntity(rpg, user))
             .collect(Collectors.toSet());
     }
 
@@ -3150,8 +3197,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
 
     @Override
     public RemoteProcessGroupEntity getRemoteProcessGroup(final String remoteProcessGroupId) {
+        return getRemoteProcessGroup(remoteProcessGroupId, NiFiUserUtils.getNiFiUser());
+    }
+
+    @Override
+    public RemoteProcessGroupEntity getRemoteProcessGroup(final String remoteProcessGroupId, final NiFiUser user) {
         final RemoteProcessGroup rpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId);
-        return createRemoteGroupEntity(rpg);
+        return createRemoteGroupEntity(rpg, user);
     }
 
     @Override
@@ -3307,8 +3359,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
 
     @Override
     public ControllerServiceEntity getControllerService(final String controllerServiceId) {
+        return getControllerService(controllerServiceId, NiFiUserUtils.getNiFiUser());
+    }
+
+    @Override
+    public ControllerServiceEntity getControllerService(final String controllerServiceId, final NiFiUser user) {
         final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId);
-        return createControllerServiceEntity(controllerService, Sets.newHashSet(controllerServiceId), NiFiUserUtils.getNiFiUser());
+        return createControllerServiceEntity(controllerService, Sets.newHashSet(controllerServiceId), user);
     }
 
     @Override
@@ -3375,6 +3432,415 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         return entityFactory.createStatusHistoryEntity(dto, permissions);
     }
 
+    @Override
+    public VersionControlComponentMappingEntity registerFlowWithFlowRegistry(final String groupId, final VersionedFlowEntity requestEntity) {
+        // Create a VersionedProcessGroup snapshot of the flow as it is currently.
+        final InstantiatedVersionedProcessGroup versionedProcessGroup = createFlowSnapshot(groupId);
+
+        final VersionedFlowDTO versionedFlowDto = requestEntity.getVersionedFlow();
+        final String flowId = versionedFlowDto.getFlowId() == null ? UUID.randomUUID().toString() : versionedFlowDto.getFlowId();
+
+        final VersionedFlow versionedFlow = new VersionedFlow();
+        versionedFlow.setBucketIdentifier(versionedFlowDto.getBucketId());
+        versionedFlow.setCreatedTimestamp(System.currentTimeMillis());
+        versionedFlow.setDescription(versionedFlowDto.getDescription());
+        versionedFlow.setModifiedTimestamp(versionedFlow.getCreatedTimestamp());
+        versionedFlow.setName(versionedFlowDto.getFlowName());
+        versionedFlow.setIdentifier(flowId);
+
+        // Add the Versioned Flow and first snapshot to the Flow Registry
+        final String registryId = requestEntity.getVersionedFlow().getRegistryId();
+        final VersionedFlowSnapshot registeredSnapshot;
+        final VersionedFlow registeredFlow;
+
+        String action = "create the flow";
+        try {
+            // first, create the flow in the registry, if necessary
+            if (versionedFlowDto.getFlowId() == null) {
+                registeredFlow = registerVersionedFlow(registryId, versionedFlow);
+            } else {
+                registeredFlow = getVersionedFlow(registryId, versionedFlowDto.getBucketId(), versionedFlowDto.getFlowId());
+            }
+
+            action = "add the local flow to the Flow Registry as the first Snapshot";
+
+            // add first snapshot to the flow in the registry
+            final String comments = versionedFlow.getDescription() == null ? "Initial version of flow" : versionedFlow.getDescription();
+            registeredSnapshot = registerVersionedFlowSnapshot(registryId, registeredFlow, versionedProcessGroup, comments);
+        } catch (final UnknownResourceException e) {
+            throw new IllegalArgumentException(e);
+        } catch (final IOException ioe) {
+            // will result in a 500: Internal Server Error
+            throw new RuntimeException("Failed to communicate with Flow Registry when attempting to " + action);
+        }
+
+        // Update the Process Group with the new VersionControlInformation. (Send this to all nodes).
+        final VersionControlInformationDTO vci = new VersionControlInformationDTO();
+        vci.setBucketId(registeredFlow.getBucketIdentifier());
+        vci.setCurrent(true);
+        vci.setFlowId(registeredFlow.getIdentifier());
+        vci.setGroupId(groupId);
+        vci.setModified(false);
+        vci.setRegistryId(registryId);
+        vci.setVersion(registeredSnapshot.getSnapshotMetadata().getVersion());
+
+        final Map<String, String> mapping = dtoFactory.createVersionControlComponentMappingDto(versionedProcessGroup);
+
+        final Revision groupRevision = revisionManager.getRevision(groupId);
+        final RevisionDTO groupRevisionDto = dtoFactory.createRevisionDTO(groupRevision);
+
+        final VersionControlComponentMappingEntity entity = new VersionControlComponentMappingEntity();
+        entity.setVersionControlInformation(vci);
+        entity.setProcessGroupRevision(groupRevisionDto);
+        entity.setVersionControlComponentMapping(mapping);
+        return entity;
+    }
+
+    @Override
+    public VersionControlInformationEntity getVersionControlInformation(final String groupId) {
+        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
+        final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation();
+        if (versionControlInfo == null) {
+            return null;
+        }
+
+        final VersionControlInformationDTO versionControlDto = dtoFactory.createVersionControlInformationDto(versionControlInfo);
+        final RevisionDTO groupRevision = dtoFactory.createRevisionDTO(revisionManager.getRevision(groupId));
+        return entityFactory.createVersionControlInformationEntity(versionControlDto, groupRevision);
+    }
+
+    private InstantiatedVersionedProcessGroup createFlowSnapshot(final String processGroupId) {
+        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId);
+        final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
+        final InstantiatedVersionedProcessGroup versionedGroup = mapper.mapProcessGroup(processGroup, flowRegistryClient);
+        return versionedGroup;
+    }
+
+    @Override
+    public VersionedFlow registerVersionedFlow(final String registryId, final VersionedFlow flow) throws IOException, UnknownResourceException {
+        final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
+        if (registry == null) {
+            throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
+        }
+
+        return registry.registerVersionedFlow(flow);
+    }
+
+    private VersionedFlow getVersionedFlow(final String registryId, final String bucketId, final String flowId) throws IOException, UnknownResourceException {
+        final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
+        if (registry == null) {
+            throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
+        }
+
+        return registry.getVersionedFlow(bucketId, flowId);
+    }
+
+    @Override
+    public VersionedFlowSnapshot registerVersionedFlowSnapshot(final String registryId, final VersionedFlow flow,
+            final VersionedProcessGroup snapshot, final String comments) throws IOException, UnknownResourceException {
+        final FlowRegistry registry = flowRegistryClient.getFlowRegistry(registryId);
+        if (registry == null) {
+            throw new ResourceNotFoundException("No Flow Registry exists with ID " + registryId);
+        }
+
+        return registry.registerVersionedFlowSnapshot(flow, snapshot, comments);
+    }
+
+    @Override
+    public VersionControlInformationEntity setVersionControlInformation(final Revision revision, final String processGroupId,
+            final VersionControlInformationDTO versionControlInfo, final Map<String, String> versionedComponentMapping) {
+
+        final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
+
+        final RevisionUpdate<VersionControlInformationDTO> snapshot = updateComponent(revision,
+            group,
+            () -> processGroupDAO.updateVersionControlInformation(versionControlInfo, versionedComponentMapping),
+            processGroup -> dtoFactory.createVersionControlInformationDto(processGroup.getVersionControlInformation()));
+
+        return entityFactory.createVersionControlInformationEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()));
+    }
+
+    @Override
+    public void verifyCanUpdate(final String groupId, final VersionedFlowSnapshot proposedFlow, final boolean verifyConnectionRemoval, final boolean verifyNotDirty) {
+        final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
+        group.verifyCanUpdate(proposedFlow, verifyConnectionRemoval, verifyNotDirty);
+    }
+
+    @Override
+    public Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(final String processGroupId, final VersionedFlowSnapshot updatedSnapshot, final NiFiUser user) throws IOException {
+        final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
+
+        final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
+        final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, flowRegistryClient);
+
+        final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localContents);
+        final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Proposed Flow", updatedSnapshot.getFlowContents());
+
+        final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow);
+        final FlowComparison comparison = flowComparator.compare();
+
+        final Set<AffectedComponentEntity> affectedComponents = comparison.getDifferences().stream()
+            .filter(difference -> difference.getDifferenceType() != DifferenceType.COMPONENT_ADDED) // components that are added are not components that will be affected in the local flow.
+            .map(difference -> {
+                final VersionedComponent localComponent = difference.getComponentA();
+
+                final String state;
+                switch (localComponent.getComponentType()) {
+                    case CONTROLLER_SERVICE:
+                        final String serviceId = ((InstantiatedVersionedControllerService) localComponent).getInstanceId();
+                        state = controllerServiceDAO.getControllerService(serviceId).getState().name();
+                        break;
+                    case PROCESSOR:
+                        final String processorId = ((InstantiatedVersionedProcessor) localComponent).getInstanceId();
+                        state = processorDAO.getProcessor(processorId).getPhysicalScheduledState().name();
+                        break;
+                    case REMOTE_INPUT_PORT:
+                        final InstantiatedVersionedRemoteGroupPort inputPort = (InstantiatedVersionedRemoteGroupPort) localComponent;
+                        state = remoteProcessGroupDAO.getRemoteProcessGroup(inputPort.getInstanceGroupId()).getInputPort(inputPort.getInstanceId()).getScheduledState().name();
+                        break;
+                    case REMOTE_OUTPUT_PORT:
+                        final InstantiatedVersionedRemoteGroupPort outputPort = (InstantiatedVersionedRemoteGroupPort) localComponent;
+                        state = remoteProcessGroupDAO.getRemoteProcessGroup(outputPort.getInstanceGroupId()).getOutputPort(outputPort.getInstanceId()).getScheduledState().name();
+                        break;
+                    default:
+                        state = null;
+                        break;
+                }
+
+                return createAffectedComponentEntity((InstantiatedVersionedComponent) localComponent, localComponent.getComponentType().name(), state, user);
+            })
+            .collect(Collectors.toCollection(HashSet::new));
+
+        for (final FlowDifference difference : comparison.getDifferences()) {
+            VersionedComponent component = difference.getComponentA();
+            if (component == null) {
+                component = difference.getComponentB();
+            }
+
+            if (component.getComponentType() == org.apache.nifi.registry.flow.ComponentType.CONNECTION) {
+                final VersionedConnection connection = (VersionedConnection) component;
+
+                final ConnectableComponent source = connection.getSource();
+                final ConnectableComponent destination = connection.getDestination();
+
+                affectedComponents.add(createAffectedComponentEntity((InstantiatedConnectableComponent) source, user));
+                affectedComponents.add(createAffectedComponentEntity((InstantiatedConnectableComponent) destination, user));
+            }
+        }
+
+        return affectedComponents;
+    }
+
+    private String getComponentState(final InstantiatedConnectableComponent localComponent) {
+        final String componentId = localComponent.getInstanceId();
+        final String groupId = localComponent.getInstanceGroupId();
+
+        switch (localComponent.getType()) {
+            case PROCESSOR:
+                return processorDAO.getProcessor(componentId).getPhysicalScheduledState().name();
+            case REMOTE_INPUT_PORT:
+                return remoteProcessGroupDAO.getRemoteProcessGroup(groupId).getInputPort(componentId).getScheduledState().name();
+            case REMOTE_OUTPUT_PORT:
+                return remoteProcessGroupDAO.getRemoteProcessGroup(groupId).getOutputPort(componentId).getScheduledState().name();
+            default:
+                return null;
+        }
+    }
+
+    private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedVersionedComponent instance, final String componentTypeName, final String componentState, final NiFiUser user) {
+        final AffectedComponentEntity entity = new AffectedComponentEntity();
+        entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(instance.getInstanceId())));
+        entity.setId(instance.getInstanceId());
+
+        final Authorizable authorizable = getAuthorizable(componentTypeName, instance);
+        final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable, user);
+        entity.setPermissions(permissionsDto);
+
+        final AffectedComponentDTO dto = new AffectedComponentDTO();
+        dto.setId(instance.getInstanceId());
+        dto.setReferenceType(componentTypeName);
+        dto.setProcessGroupId(instance.getInstanceGroupId());
+        dto.setState(componentState);
+
+        entity.setComponent(dto);
+        return entity;
+    }
+
+    private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedConnectableComponent instance, final NiFiUser user) {
+        final AffectedComponentEntity entity = new AffectedComponentEntity();
+        entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(instance.getInstanceId())));
+        entity.setId(instance.getInstanceId());
+
+        final String componentTypeName = instance.getType().name();
+        final Authorizable authorizable = getAuthorizable(componentTypeName, instance);
+        final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable, user);
+        entity.setPermissions(permissionsDto);
+
+        final AffectedComponentDTO dto = new AffectedComponentDTO();
+        dto.setId(instance.getInstanceId());
+        dto.setReferenceType(componentTypeName);
+        dto.setProcessGroupId(instance.getInstanceGroupId());
+        dto.setState(getComponentState(instance));
+
+        entity.setComponent(dto);
+        return entity;
+    }
+
+    private Authorizable getAuthorizable(final String componentTypeName, final InstantiatedVersionedComponent versionedComponent) {
+        final String componentId = versionedComponent.getInstanceId();
+
+        if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.CONTROLLER_SERVICE.name())) {
+            return authorizableLookup.getControllerService(componentId).getAuthorizable();
+        }
+
+        if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.CONNECTION.name())) {
+            return authorizableLookup.getConnection(componentId).getAuthorizable();
+        }
+
+        if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.FUNNEL.name())) {
+            return authorizableLookup.getFunnel(componentId);
+        }
+
+        if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.INPUT_PORT.name())) {
+            return authorizableLookup.getInputPort(componentId);
+        }
+
+        if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.OUTPUT_PORT.name())) {
+            return authorizableLookup.getOutputPort(componentId);
+        }
+
+        if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.LABEL.name())) {
+            return authorizableLookup.getLabel(componentId);
+        }
+
+        if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.PROCESS_GROUP.name())) {
+            return authorizableLookup.getProcessGroup(componentId).getAuthorizable();
+        }
+
+        if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.PROCESSOR.name())) {
+            return authorizableLookup.getProcessor(componentId).getAuthorizable();
+        }
+
+        if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_INPUT_PORT.name())) {
+            return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId());
+        }
+
+        if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_OUTPUT_PORT.name())) {
+            return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId());
+        }
+
+        if (componentTypeName.equals(org.apache.nifi.registry.flow.ComponentType.REMOTE_PROCESS_GROUP.name())) {
+            return authorizableLookup.getRemoteProcessGroup(versionedComponent.getInstanceGroupId());
+        }
+
+        return null;
+    }
+
+    @Override
+    public VersionedFlowSnapshot getVersionedFlowSnapshot(final VersionControlInformationDTO versionControlInfo) throws IOException {
+        final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(versionControlInfo.getRegistryId());
+        if (flowRegistry == null) {
+            throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + versionControlInfo.getRegistryId());
+        }
+
+        final VersionedFlowSnapshot snapshot;
+        try {
+            snapshot = flowRegistry.getFlowContents(versionControlInfo.getBucketId(), versionControlInfo.getFlowId(), versionControlInfo.getVersion());
+        } catch (final UnknownResourceException e) {
+            throw new IllegalArgumentException("The Flow Registry with ID " + versionControlInfo.getRegistryId() + " reports that no Flow exists with Bucket "
+                + versionControlInfo.getBucketId() + ", Flow " + versionControlInfo.getFlowId() + ", Version " + versionControlInfo.getVersion());
+        }
+
+        // If this Flow has a reference to a remote flow, we need to pull that remote flow as well
+        populateVersionedChildFlows(snapshot);
+
+        return snapshot;
+    }
+
+    private void populateVersionedChildFlows(final VersionedFlowSnapshot snapshot) throws IOException {
+        final VersionedProcessGroup group = snapshot.getFlowContents();
+
+        for (final VersionedProcessGroup child : group.getProcessGroups()) {
+            populateVersionedFlows(child);
+        }
+    }
+
+    private void populateVersionedFlows(final VersionedProcessGroup group) throws IOException {
+        final RemoteFlowCoordinates remoteCoordinates = group.getRemoteFlowCoordinates();
+
+        if (remoteCoordinates != null) {
+            final String registryUrl = remoteCoordinates.getRegistryUrl();
+            final String registryId = flowRegistryClient.getFlowRegistryId(registryUrl);
+            if (registryId == null) {
+                throw new IllegalArgumentException("Process Group with ID " + group.getIdentifier() + " is under Version Control, referencing a Flow Registry at URL [" + registryUrl
+                    + "], but no Flow Registry is currently registered for that URL.");
+            }
+
+            final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
+
+            final VersionedFlowSnapshot childSnapshot;
+            try {
+                childSnapshot = flowRegistry.getFlowContents(remoteCoordinates.getBucketId(), remoteCoordinates.getFlowId(), remoteCoordinates.getVersion());
+            } catch (final UnknownResourceException e) {
+                throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket "
+                    + remoteCoordinates.getBucketId() + ", Flow " + remoteCoordinates.getFlowId() + ", Version " + remoteCoordinates.getVersion());
+            }
+
+            final VersionedProcessGroup fetchedGroup = childSnapshot.getFlowContents();
+            group.setComments(fetchedGroup.getComments());
+            group.setPosition(fetchedGroup.getPosition());
+            group.setName(fetchedGroup.getName());
+            group.setVariables(fetchedGroup.getVariables());
+
+            group.setConnections(new LinkedHashSet<>(fetchedGroup.getConnections()));
+            group.setControllerServices(new LinkedHashSet<>(fetchedGroup.getControllerServices()));
+            group.setFunnels(new LinkedHashSet<>(fetchedGroup.getFunnels()));
+            group.setInputPorts(new LinkedHashSet<>(fetchedGroup.getInputPorts()));
+            group.setLabels(new LinkedHashSet<>(fetchedGroup.getLabels()));
+            group.setOutputPorts(new LinkedHashSet<>(fetchedGroup.getOutputPorts()));
+            group.setProcessGroups(new LinkedHashSet<>(fetchedGroup.getProcessGroups()));
+            group.setProcessors(new LinkedHashSet<>(fetchedGroup.getProcessors()));
+            group.setRemoteProcessGroups(new LinkedHashSet<>(fetchedGroup.getRemoteProcessGroups()));
+        }
+
+        for (final VersionedProcessGroup child : group.getProcessGroups()) {
+            populateVersionedFlows(child);
+        }
+    }
+
+
+    @Override
+    public ProcessGroupEntity updateProcessGroup(final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo,
+        final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified) {
+
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+        return updateProcessGroup(user, revision, groupId, versionControlInfo, proposedFlowSnapshot, componentIdSeed, verifyNotModified);
+    }
+
+    @Override
+    public ProcessGroupEntity updateProcessGroup(final NiFiUser user, final Revision revision, final String groupId, final VersionControlInformationDTO versionControlInfo,
+        final VersionedFlowSnapshot proposedFlowSnapshot, final String componentIdSeed, final boolean verifyNotModified) {
+
+        final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(groupId);
+        final RevisionUpdate<ProcessGroupDTO> snapshot = updateComponent(user, revision,
+            processGroupNode,
+            () -> processGroupDAO.updateProcessGroupFlow(groupId, proposedFlowSnapshot, versionControlInfo, componentIdSeed, verifyNotModified),
+            processGroup -> dtoFactory.createProcessGroupDto(processGroup));
+
+        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode);
+        final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
+        final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroupNode.getIdentifier()));
+        final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroupNode.getIdentifier()));
+        final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
+        return entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, permissions, status, bulletinEntities);
+    }
+
+
+    @Override
+    public void setFlowRegistryClient(final FlowRegistryClient client) {
+        this.flowRegistryClient = client;
+    }
+
     private AuthorizationResult authorizeAction(final Action action) {
         final String sourceId = action.getSourceId();
         final Component type = action.getSourceType();

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index 7118e01..531823a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -16,40 +16,8 @@
  */
 package org.apache.nifi.web.api;
 
-import static javax.ws.rs.core.Response.Status.NOT_FOUND;
-import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME;
-import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;
-
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.core.CacheControl;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedHashMap;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.ResponseBuilder;
-import javax.ws.rs.core.UriBuilder;
-import javax.ws.rs.core.UriBuilderException;
-import javax.ws.rs.core.UriInfo;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.AuthorizableLookup;
 import org.apache.nifi.authorization.AuthorizeAccess;
@@ -69,6 +37,7 @@ import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
 import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.framework.security.util.SslContextFactory;
 import org.apache.nifi.remote.HttpRemoteSiteListener;
 import org.apache.nifi.remote.VersionNegotiator;
 import org.apache.nifi.remote.exception.BadRequestException;
@@ -77,6 +46,7 @@ import org.apache.nifi.remote.exception.NotAuthorizedException;
 import org.apache.nifi.remote.protocol.ResponseCode;
 import org.apache.nifi.remote.protocol.http.HttpHeaders;
 import org.apache.nifi.util.ComponentIdGenerator;
+import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.Revision;
@@ -87,9 +57,45 @@ import org.apache.nifi.web.api.entity.TransactionResultEntity;
 import org.apache.nifi.web.security.ProxiedEntitiesUtils;
 import org.apache.nifi.web.security.util.CacheKey;
 import org.apache.nifi.web.util.WebUtils;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.core.CacheControl;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriBuilderException;
+import javax.ws.rs.core.UriInfo;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static javax.ws.rs.core.Response.Status.NOT_FOUND;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;
+
 /**
  * Base class for controllers.
  */
@@ -221,6 +227,16 @@ public abstract class ApplicationResource {
         return Optional.of(idGenerationSeed);
     }
 
+    protected Client createJerseyClient() {
+        final NiFiProperties properties = getProperties();
+        final ClientConfig clientConfig = new ClientConfig();
+        final int connectionTimeout = (int) FormatUtils.getTimeDuration(properties.getClusterNodeConnectionTimeout(), TimeUnit.MILLISECONDS);
+        final int readTimeout = (int) FormatUtils.getTimeDuration(properties.getClusterNodeReadTimeout(), TimeUnit.MILLISECONDS);
+        clientConfig.property(ClientProperties.READ_TIMEOUT, readTimeout);
+        clientConfig.property(ClientProperties.CONNECT_TIMEOUT, connectionTimeout);
+        clientConfig.property(ClientProperties.FOLLOW_REDIRECTS, Boolean.TRUE);
+        return WebUtils.createClient(clientConfig, SslContextFactory.createSslContext(properties));
+    }
 
     /**
      * Generates an Ok response with no content.

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index c9aea4f..38e8891 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -595,6 +595,24 @@ public class FlowResource extends ApplicationResource {
                             componentIds.add(outputPort.getIdentifier());
                         });
 
+                // ensure authorized for each remote input port we will attempt to schedule
+                group.findAllRemoteProcessGroups().stream()
+                    .flatMap(rpg -> rpg.getInputPorts().stream())
+                    .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
+                    .filter(port -> port.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()))
+                    .forEach(port -> {
+                        componentIds.add(port.getIdentifier());
+                    });
+
+                // ensure authorized for each remote output port we will attempt to schedule
+                group.findAllRemoteProcessGroups().stream()
+                    .flatMap(rpg -> rpg.getOutputPorts().stream())
+                    .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
+                    .filter(port -> port.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()))
+                    .forEach(port -> {
+                        componentIds.add(port.getIdentifier());
+                    });
+
                 return componentIds;
             });
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/6a58d780/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index b866677..ebed0ad 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -763,7 +763,6 @@ public class ProcessGroupResource extends ApplicationResource {
     private void updateVariableRegistryReplicated(final String groupId, final URI originalUri, final Collection<AffectedComponentDTO> affectedProcessors,
                                                   final Collection<AffectedComponentDTO> affectedServices, final VariableRegistryUpdateRequest updateRequest,
                                                   final VariableRegistryEntity requestEntity) throws InterruptedException, IOException {
-
         final Pause pause = createPause(updateRequest);
 
         // stop processors
@@ -805,8 +804,6 @@ public class ProcessGroupResource extends ApplicationResource {
             logger.info("In order to update Variable Registry for Process Group with ID {}, no Processors are affected.", groupId);
             updateRequest.getStartProcessorsStep().setComplete(true);
         }
-
-        updateRequest.setComplete(true);
     }
 
     /**
@@ -1414,6 +1411,7 @@ public class ProcessGroupResource extends ApplicationResource {
      * @param <T> type of class
      * @return the response entity
      */
+    @SuppressWarnings("unchecked")
     private <T> T getResponseEntity(final NodeResponse nodeResponse, final Class<T> clazz) {
         T entity = (T) nodeResponse.getUpdatedEntity();
         if (entity == null) {