You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/09/14 15:13:06 UTC

[3/4] nifi git commit: NIFI-4280: - Adding support for the user to configure variables in the UI. - Updating the endpoints for changing variables as necessary. This closes #2135.

http://git-wip-us.apache.org/repos/asf/nifi/blob/eac47e90/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 0b634a1..0e574e0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -16,56 +16,18 @@
  */
 package org.apache.nifi.web.api;
 
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.UriBuilder;
-import javax.ws.rs.core.UriInfo;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.transform.stream.StreamSource;
-
+import com.sun.jersey.api.core.ResourceContext;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import com.sun.jersey.multipart.FormDataParam;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.AuthorizableLookup;
+import org.apache.nifi.authorization.AuthorizeAccess;
 import org.apache.nifi.authorization.AuthorizeControllerServiceReference;
 import org.apache.nifi.authorization.Authorizer;
 import org.apache.nifi.authorization.ComponentAuthorizable;
@@ -75,19 +37,18 @@ import org.apache.nifi.authorization.SnippetAuthorizable;
 import org.apache.nifi.authorization.TemplateContentsAuthorizable;
 import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.authorization.user.NiFiUserDetails;
 import org.apache.nifi.authorization.user.NiFiUserUtils;
 import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.serialization.FlowEncodingVersion;
 import org.apache.nifi.controller.service.ControllerServiceState;
-import org.apache.nifi.framework.security.util.SslContextFactory;
 import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
 import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
 import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
 import org.apache.nifi.util.BundleUtils;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.ResourceNotFoundException;
 import org.apache.nifi.web.Revision;
@@ -106,9 +67,9 @@ import org.apache.nifi.web.api.dto.RevisionDTO;
 import org.apache.nifi.web.api.dto.TemplateDTO;
 import org.apache.nifi.web.api.dto.VariableRegistryDTO;
 import org.apache.nifi.web.api.dto.flow.FlowDTO;
-import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
-import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
 import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
+import org.apache.nifi.web.api.entity.AffectedComponentEntity;
 import org.apache.nifi.web.api.entity.ConnectionEntity;
 import org.apache.nifi.web.api.entity.ConnectionsEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
@@ -125,8 +86,6 @@ import org.apache.nifi.web.api.entity.LabelsEntity;
 import org.apache.nifi.web.api.entity.OutputPortsEntity;
 import org.apache.nifi.web.api.entity.PortEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
-import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
-import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupsEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
 import org.apache.nifi.web.api.entity.ProcessorsEntity;
@@ -138,23 +97,61 @@ import org.apache.nifi.web.api.entity.VariableRegistryEntity;
 import org.apache.nifi.web.api.entity.VariableRegistryUpdateRequestEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.LongParameter;
+import org.apache.nifi.web.security.token.NiFiAuthenticationToken;
 import org.apache.nifi.web.util.Pause;
-import org.apache.nifi.web.util.WebUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.context.SecurityContextHolder;
 
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import com.sun.jersey.api.core.ResourceContext;
-import com.sun.jersey.multipart.FormDataParam;
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriInfo;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * RESTful endpoint for managing a Group.
@@ -455,14 +452,12 @@ public class ProcessGroupResource extends ApplicationResource {
             throw new IllegalArgumentException("Group ID and Update ID must both be specified.");
         }
 
-        if (isReplicateRequest()) {
-            return replicate(HttpMethod.GET);
-        }
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
 
         // authorize access
         serviceFacade.authorizeAccess(lookup -> {
             final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
-            processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+            processGroup.authorize(authorizer, RequestAction.READ, user);
         });
 
         final VariableRegistryUpdateRequest request = varRegistryUpdateRequests.get(updateId);
@@ -474,9 +469,14 @@ public class ProcessGroupResource extends ApplicationResource {
             throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId + " for Process Group with identifier " + groupId);
         }
 
+        if (!user.equals(request.getUser())) {
+            throw new IllegalArgumentException("Only the user that submitted the update request can retrieve it.");
+        }
+
         final VariableRegistryUpdateRequestEntity entity = new VariableRegistryUpdateRequestEntity();
-        entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request));
-        entity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId));
+        entity.setRequest(dtoFactory.createVariableRegistryUpdateRequestDto(request));
+        entity.setProcessGroupRevision(request.getProcessGroupRevision());
+        entity.getRequest().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId));
         return generateOkResponse(entity).build();
     }
 
@@ -506,15 +506,13 @@ public class ProcessGroupResource extends ApplicationResource {
             throw new IllegalArgumentException("Group ID and Update ID must both be specified.");
         }
 
-        if (isReplicateRequest()) {
-            return replicate(HttpMethod.DELETE);
-        }
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
 
         // authorize access
         serviceFacade.authorizeAccess(lookup -> {
             final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
-            processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
-            processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+            processGroup.authorize(authorizer, RequestAction.READ, user);
+            processGroup.authorize(authorizer, RequestAction.WRITE, user);
         });
 
         final VariableRegistryUpdateRequest request = varRegistryUpdateRequests.remove(updateId);
@@ -526,11 +524,16 @@ public class ProcessGroupResource extends ApplicationResource {
             throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId + " for Process Group with identifier " + groupId);
         }
 
+        if (!user.equals(request.getUser())) {
+            throw new IllegalArgumentException("Only the user that submitted the update request can remove it.");
+        }
+
         request.cancel();
 
         final VariableRegistryUpdateRequestEntity entity = new VariableRegistryUpdateRequestEntity();
-        entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request));
-        entity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId));
+        entity.setRequest(dtoFactory.createVariableRegistryUpdateRequestDto(request));
+        entity.setProcessGroupRevision(request.getProcessGroupRevision());
+        entity.getRequest().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId));
         return generateOkResponse(entity).build();
     }
 
@@ -634,11 +637,11 @@ public class ProcessGroupResource extends ApplicationResource {
         // 1. Determine Affected Components (this includes any Processors and Controller Services and any components that reference an affected Controller Service).
         //    1a. Determine ID's of components
         //    1b. Determine Revision's of associated components
-        // 2. Stop All Affected Processors
-        // 3. Disable All Affected Controller Services
+        // 2. Stop All Active Affected Processors
+        // 3. Disable All Active Affected Controller Services
         // 4. Update the Variables
-        // 5. Re-Enable all Affected Controller Services (services only, not dependent components)
-        // 6. Re-Enable all Processors that Depended on the Controller Services
+        // 5. Re-Enable all previously Active Affected Controller Services (services only, not dependent components)
+        // 6. Re-Enable all previously Active Processors that Depended on the Controller Services
 
         // Determine the affected components (and their associated revisions)
         final VariableRegistryEntity computedEntity = serviceFacade.populateAffectedComponents(requestEntity.getVariableRegistry());
@@ -647,38 +650,77 @@ public class ProcessGroupResource extends ApplicationResource {
             throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
         }
 
-        final Set<AffectedComponentDTO> affectedComponents = serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry());
+        final Set<AffectedComponentEntity> allAffectedComponents = serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry());
+        final Set<AffectedComponentDTO> activeAffectedComponents = serviceFacade.getActiveComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry());
 
-        final Map<String, List<AffectedComponentDTO>> affectedComponentsByType = affectedComponents.stream()
-            .collect(Collectors.groupingBy(comp -> comp.getComponentType()));
+        final Map<String, List<AffectedComponentDTO>> activeAffectedComponentsByType = activeAffectedComponents.stream()
+            .collect(Collectors.groupingBy(comp -> comp.getReferenceType()));
 
-        final List<AffectedComponentDTO> affectedProcessors = affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
-        final List<AffectedComponentDTO> affectedServices = affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+        final List<AffectedComponentDTO> activeAffectedProcessors = activeAffectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
+        final List<AffectedComponentDTO> activeAffectedServices = activeAffectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
 
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+        // define access authorize for execution below
+        final AuthorizeAccess authorizeAccess = lookup -> {
+            final Authorizable groupAuthorizable = lookup.getProcessGroup(groupId).getAuthorizable();
+            groupAuthorizable.authorize(authorizer, RequestAction.WRITE, user);
+
+            // For every component that is affected, the user must have READ permissions and WRITE permissions
+            // (because this action requires stopping the component).
+            if (activeAffectedProcessors != null) {
+                for (final AffectedComponentDTO activeAffectedComponent : activeAffectedProcessors) {
+                    final Authorizable authorizable = lookup.getProcessor(activeAffectedComponent.getId()).getAuthorizable();
+                    authorizable.authorize(authorizer, RequestAction.READ, user);
+                    authorizable.authorize(authorizer, RequestAction.WRITE, user);
+                }
+            }
+
+            if (activeAffectedServices != null) {
+                for (final AffectedComponentDTO activeAffectedComponent : activeAffectedServices) {
+                    final Authorizable authorizable = lookup.getControllerService(activeAffectedComponent.getId()).getAuthorizable();
+                    authorizable.authorize(authorizer, RequestAction.READ, user);
+                    authorizable.authorize(authorizer, RequestAction.WRITE, user);
+                }
+            }
+        };
 
         if (isReplicateRequest()) {
+            // authorize access
+            serviceFacade.authorizeAccess(authorizeAccess);
+
             // update the variable registry
-            final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId);
+            final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId, allAffectedComponents, user);
             updateRequest.getIdentifyRelevantComponentsStep().setComplete(true);
             final URI originalUri = getAbsolutePath();
 
             // Submit the task to be run in the background
             final Runnable taskWrapper = () -> {
                 try {
-                    updateVariableRegistryReplicated(groupId, originalUri, affectedProcessors, affectedServices, updateRequest, requestEntity);
+                    // set the user authentication token
+                    final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(user));
+                    SecurityContextHolder.getContext().setAuthentication(authentication);
+
+                    updateVariableRegistryReplicated(groupId, originalUri, activeAffectedProcessors, activeAffectedServices, updateRequest, requestEntity);
                 } catch (final Exception e) {
                     logger.error("Failed to update variable registry", e);
+
+                    updateRequest.setComplete(true);
                     updateRequest.setFailureReason("An unexpected error has occurred: " + e);
+                } finally {
+                    // clear the authentication token
+                    SecurityContextHolder.getContext().setAuthentication(null);
                 }
             };
 
             variableRegistryThreadPool.submit(taskWrapper);
 
             final VariableRegistryUpdateRequestEntity responseEntity = new VariableRegistryUpdateRequestEntity();
-            responseEntity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest));
-            responseEntity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId()));
+            responseEntity.setRequest(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest));
+            responseEntity.setProcessGroupRevision(updateRequest.getProcessGroupRevision());
+            responseEntity.getRequest().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId()));
 
-            final URI location = URI.create(responseEntity.getRequestDto().getUri());
+            final URI location = URI.create(responseEntity.getRequest().getUri());
             return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build();
         }
 
@@ -688,34 +730,10 @@ public class ProcessGroupResource extends ApplicationResource {
             serviceFacade,
             requestEntity,
             requestRevision,
-            lookup -> {
-                final NiFiUser user = NiFiUserUtils.getNiFiUser();
-
-                final Authorizable groupAuthorizable = lookup.getProcessGroup(groupId).getAuthorizable();
-                groupAuthorizable.authorize(authorizer, RequestAction.WRITE, user);
-
-                // For every component that is affected, the user must have READ permissions and WRITE permissions
-                // (because this action requires stopping the component).
-                if (affectedProcessors != null) {
-                    for (final AffectedComponentDTO affectedComponent : affectedProcessors) {
-                        final Authorizable authorizable = lookup.getProcessor(affectedComponent.getComponentId()).getAuthorizable();
-                        authorizable.authorize(authorizer, RequestAction.READ, user);
-                        authorizable.authorize(authorizer, RequestAction.WRITE, user);
-                    }
-                }
-
-                if (affectedServices != null) {
-                    for (final AffectedComponentDTO affectedComponent : affectedServices) {
-                        final Authorizable authorizable = lookup.getControllerService(affectedComponent.getComponentId()).getAuthorizable();
-                        authorizable.authorize(authorizer, RequestAction.READ, user);
-                        authorizable.authorize(authorizer, RequestAction.WRITE, user);
-                    }
-                }
-            },
+            authorizeAccess,
             null,
-            (revision, varRegistryEntity) -> {
-                return updateVariableRegistryLocal(groupId, affectedProcessors, affectedServices, requestEntity);
-            });
+            (revision, varRegistryEntity) -> updateVariableRegistryLocal(groupId, allAffectedComponents, activeAffectedProcessors, activeAffectedServices, user, requestEntity)
+        );
     }
 
     private Pause createPause(final VariableRegistryUpdateRequest updateRequest) {
@@ -739,58 +757,49 @@ public class ProcessGroupResource extends ApplicationResource {
     }
 
     private void updateVariableRegistryReplicated(final String groupId, final URI originalUri, final Collection<AffectedComponentDTO> affectedProcessors,
-        final Collection<AffectedComponentDTO> affectedServices,
-        final VariableRegistryUpdateRequest updateRequest, final VariableRegistryEntity requestEntity) {
-
-        final NiFiProperties properties = getProperties();
-        final Client jerseyClient = WebUtils.createClient(new DefaultClientConfig(), SslContextFactory.createSslContext(properties));
-        final int connectionTimeout = (int) FormatUtils.getTimeDuration(properties.getClusterNodeConnectionTimeout(), TimeUnit.MILLISECONDS);
-        final int readTimeout = (int) FormatUtils.getTimeDuration(properties.getClusterNodeReadTimeout(), TimeUnit.MILLISECONDS);
-        jerseyClient.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT, connectionTimeout);
-        jerseyClient.getProperties().put(ClientConfig.PROPERTY_READ_TIMEOUT, readTimeout);
-        jerseyClient.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS, Boolean.TRUE);
+                                                  final Collection<AffectedComponentDTO> affectedServices, final VariableRegistryUpdateRequest updateRequest,
+                                                  final VariableRegistryEntity requestEntity) throws InterruptedException, IOException {
 
         final Pause pause = createPause(updateRequest);
 
         // stop processors
         if (affectedProcessors != null) {
-            logger.info("In order to update Variable Registry for Process Group with ID {}, "
-                + "replicating request to stop {} affected processors", groupId, affectedProcessors.size());
-
-            scheduleProcessors(groupId, originalUri, jerseyClient, updateRequest, pause,
-                affectedProcessors, ScheduledState.STOPPED, updateRequest.getStopProcessorsStep());
+            logger.info("In order to update Variable Registry for Process Group with ID {}, replicating request to stop {} affected Processors", groupId, affectedProcessors.size());
+            scheduleProcessors(groupId, originalUri, updateRequest, pause, affectedProcessors, ScheduledState.STOPPED, updateRequest.getStopProcessorsStep());
+        } else {
+            logger.info("In order to update Variable Registry for Process Group with ID {}, no Processors are affected.", groupId);
+            updateRequest.getStopProcessorsStep().setComplete(true);
         }
 
         // disable controller services
         if (affectedServices != null) {
-            logger.info("In order to update Variable Registry for Process Group with ID {}, "
-                + "replicating request to stop {} affected Controller Services", groupId, affectedServices.size());
-
-            activateControllerServices(groupId, originalUri, jerseyClient, updateRequest, pause,
-                affectedServices, ControllerServiceState.DISABLED, updateRequest.getDisableServicesStep());
+            logger.info("In order to update Variable Registry for Process Group with ID {}, replicating request to stop {} affected Controller Services", groupId, affectedServices.size());
+            activateControllerServices(groupId, originalUri, updateRequest, pause, affectedServices, ControllerServiceState.DISABLED, updateRequest.getDisableServicesStep());
+        } else {
+            logger.info("In order to update Variable Registry for Process Group with ID {}, no Controller Services are affected.", groupId);
+            updateRequest.getDisableServicesStep().setComplete(true);
         }
 
         // apply updates
-        logger.info("In order to update Variable Registry for Process Group with ID {}, "
-            + "replicating request to apply updates to variable registry", groupId);
-        applyVariableRegistryUpdate(groupId, originalUri, jerseyClient, updateRequest, requestEntity);
+        logger.info("In order to update Variable Registry for Process Group with ID {}, replicating request to apply updates to variable registry", groupId);
+        applyVariableRegistryUpdate(groupId, originalUri, updateRequest, requestEntity);
 
         // re-enable controller services
         if (affectedServices != null) {
-            logger.info("In order to update Variable Registry for Process Group with ID {}, "
-                + "replicating request to re-enable {} affected services", groupId, affectedServices.size());
-
-            activateControllerServices(groupId, originalUri, jerseyClient, updateRequest, pause,
-                affectedServices, ControllerServiceState.ENABLED, updateRequest.getEnableServicesStep());
+            logger.info("In order to update Variable Registry for Process Group with ID {}, replicating request to re-enable {} affected services", groupId, affectedServices.size());
+            activateControllerServices(groupId, originalUri, updateRequest, pause, affectedServices, ControllerServiceState.ENABLED, updateRequest.getEnableServicesStep());
+        } else {
+            logger.info("In order to update Variable Registry for Process Group with ID {}, no Controller Services are affected.", groupId);
+            updateRequest.getEnableServicesStep().setComplete(true);
         }
 
         // restart processors
         if (affectedProcessors != null) {
-            logger.info("In order to update Variable Registry for Process Group with ID {}, "
-                + "replicating request to restart {} affected processors", groupId, affectedProcessors.size());
-
-            scheduleProcessors(groupId, originalUri, jerseyClient, updateRequest, pause,
-                affectedProcessors, ScheduledState.RUNNING, updateRequest.getStartProcessorsStep());
+            logger.info("In order to update Variable Registry for Process Group with ID {}, replicating request to restart {} affected processors", groupId, affectedProcessors.size());
+            scheduleProcessors(groupId, originalUri, updateRequest, pause, affectedProcessors, ScheduledState.RUNNING, updateRequest.getStartProcessorsStep());
+        } else {
+            logger.info("In order to update Variable Registry for Process Group with ID {}, no Processors are affected.", groupId);
+            updateRequest.getStartProcessorsStep().setComplete(true);
         }
 
         updateRequest.setComplete(true);
@@ -799,34 +808,45 @@ public class ProcessGroupResource extends ApplicationResource {
     /**
      * Periodically polls the process group with the given ID, waiting for all processors whose ID's are given to have the given Scheduled State.
      *
-     * @param client the Jersey Client to use for making the request
      * @param groupId the ID of the Process Group to poll
      * @param processorIds the ID of all Processors whose state should be equal to the given desired state
      * @param desiredState the desired state for all processors with the ID's given
      * @param pause the Pause that can be used to wait between polling
      * @return <code>true</code> if successful, <code>false</code> if unable to wait for processors to reach the desired state
      */
-    private boolean waitForProcessorStatus(final Client client, final URI originalUri, final String groupId, final Set<String> processorIds, final ScheduledState desiredState, final Pause pause) {
+    private boolean waitForProcessorStatus(final URI originalUri, final String groupId, final Set<String> processorIds, final ScheduledState desiredState,
+                                           final VariableRegistryUpdateRequest updateRequest, final Pause pause) throws InterruptedException {
         URI groupUri;
         try {
             groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
-                originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/status", "recursive=true", originalUri.getFragment());
+                originalUri.getPort(), "/nifi-api/process-groups/" + groupId + "/processors", "includeDescendantGroups=true", originalUri.getFragment());
         } catch (URISyntaxException e) {
             throw new RuntimeException(e);
         }
 
+        final Map<String, String> headers = new HashMap<>();
+        final MultivaluedMap<String, String> requestEntity = new MultivaluedMapImpl();
+
         boolean continuePolling = true;
         while (continuePolling) {
-            final ClientResponse response = client.resource(groupUri).header("Content-Type", "application/json").get(ClientResponse.class);
-            if (response.getStatus() != Status.OK.getStatusCode()) {
+
+            // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
+            final NodeResponse clusterResponse;
+            if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+                clusterResponse = getRequestReplicator().replicate(HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
+            } else {
+                clusterResponse = getRequestReplicator().forwardToCoordinator(
+                        getClusterCoordinatorNode(), HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
+            }
+
+            if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
                 return false;
             }
 
-            final ProcessGroupStatusEntity statusEntity = response.getEntity(ProcessGroupStatusEntity.class);
-            final ProcessGroupStatusDTO statusDto = statusEntity.getProcessGroupStatus();
-            final ProcessGroupStatusSnapshotDTO statusSnapshotDto = statusDto.getAggregateSnapshot();
+            final ProcessorsEntity processorsEntity = getResponseEntity(clusterResponse, ProcessorsEntity.class);
+            final Set<ProcessorEntity> processorEntities = processorsEntity.getProcessors();
 
-            if (isProcessorStatusEqual(statusSnapshotDto, processorIds, desiredState)) {
+            if (isProcessorActionComplete(processorEntities, updateRequest, processorIds, desiredState)) {
                 logger.debug("All {} processors of interest now have the desired state of {}", processorIds.size(), desiredState);
                 return true;
             }
@@ -847,14 +867,14 @@ public class ProcessGroupResource extends ApplicationResource {
      * @param pause the Pause that can be used to wait between polling
      * @return <code>true</code> if successful, <code>false</code> if unable to wait for processors to reach the desired state
      */
-    private boolean waitForLocalProcessorStatus(final String groupId, final Set<String> processorIds, final ScheduledState desiredState, final Pause pause) {
+    private boolean waitForLocalProcessor(final String groupId, final Set<String> processorIds, final ScheduledState desiredState,
+                                          final VariableRegistryUpdateRequest updateRequest, final Pause pause) {
+
         boolean continuePolling = true;
         while (continuePolling) {
-            final ProcessGroupStatusEntity statusEntity = serviceFacade.getProcessGroupStatus(groupId, true);
-            final ProcessGroupStatusDTO statusDto = statusEntity.getProcessGroupStatus();
-            final ProcessGroupStatusSnapshotDTO statusSnapshotDto = statusDto.getAggregateSnapshot();
+            final Set<ProcessorEntity> processorEntities = serviceFacade.getProcessors(groupId, true);
 
-            if (isProcessorStatusEqual(statusSnapshotDto, processorIds, desiredState)) {
+            if (isProcessorActionComplete(processorEntities, updateRequest, processorIds, desiredState)) {
                 logger.debug("All {} processors of interest now have the desired state of {}", processorIds.size(), desiredState);
                 return true;
             }
@@ -866,55 +886,93 @@ public class ProcessGroupResource extends ApplicationResource {
         return false;
     }
 
-    private boolean isProcessorStatusEqual(final ProcessGroupStatusSnapshotDTO statusSnapshot, final Set<String> processorIds, final ScheduledState desiredState) {
+    private boolean isProcessorActionComplete(final Set<ProcessorEntity> processorEntities, final VariableRegistryUpdateRequest updateRequest,
+                                              final Set<String> processorIds, final ScheduledState desiredState) {
+
         final String desiredStateName = desiredState.name();
 
-        final boolean allProcessorsMatch = statusSnapshot.getProcessorStatusSnapshots().stream()
-            .map(entity -> entity.getProcessorStatusSnapshot())
-            .filter(status -> processorIds.contains(status.getId()))
-            .allMatch(status -> {
-                final String runStatus = status.getRunStatus();
-                final boolean stateMatches = desiredStateName.equalsIgnoreCase(runStatus);
-                if (!stateMatches) {
-                    return false;
-                }
+        // update the affected processors
+        processorEntities.stream()
+                .filter(entity -> updateRequest.getAffectedComponents().containsKey(entity.getId()))
+                .forEach(entity -> {
+                    final AffectedComponentEntity affectedComponentEntity = updateRequest.getAffectedComponents().get(entity.getId());
+                    affectedComponentEntity.setRevision(entity.getRevision());
+
+                    // only consider update this component if the user had permissions to it
+                    if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
+                        final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent();
+                        affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus());
+                        affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount());
+
+                        if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
+                            affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors());
+                        }
+                    }
+                });
 
-                if (desiredState == ScheduledState.STOPPED && status.getActiveThreadCount() != 0) {
-                    return false;
-                }
+        final boolean allProcessorsMatch = processorEntities.stream()
+                .filter(entity -> processorIds.contains(entity.getId()))
+                .allMatch(entity -> {
+                    final ProcessorStatusDTO status = entity.getStatus();
 
-                return true;
-            });
+                    final String runStatus = status.getAggregateSnapshot().getRunStatus();
+                    final boolean stateMatches = desiredStateName.equalsIgnoreCase(runStatus);
+                    if (!stateMatches) {
+                        return false;
+                    }
+
+                    if (desiredState == ScheduledState.STOPPED && status.getAggregateSnapshot().getActiveThreadCount() != 0) {
+                        return false;
+                    }
+
+                    return true;
+                });
 
         if (!allProcessorsMatch) {
             return false;
         }
 
-        for (final ProcessGroupStatusSnapshotEntity childGroupEntity : statusSnapshot.getProcessGroupStatusSnapshots()) {
-            final ProcessGroupStatusSnapshotDTO childGroupStatus = childGroupEntity.getProcessGroupStatusSnapshot();
-            final boolean allMatchChildLevel = isProcessorStatusEqual(childGroupStatus, processorIds, desiredState);
-            if (!allMatchChildLevel) {
-                return false;
-            }
-        }
-
         return true;
     }
 
-
+    /**
+     * Updates the affected controller services in the specified updateRequest with the serviceEntities.
+     *
+     * @param serviceEntities service entities
+     * @param updateRequest update request
+     */
+    private void updateAffectedControllerServices(final Set<ControllerServiceEntity> serviceEntities, final VariableRegistryUpdateRequest updateRequest) {
+        // update the affected components
+        serviceEntities.stream()
+                .filter(entity -> updateRequest.getAffectedComponents().containsKey(entity.getId()))
+                .forEach(entity -> {
+                    final AffectedComponentEntity affectedComponentEntity = updateRequest.getAffectedComponents().get(entity.getId());
+                    affectedComponentEntity.setRevision(entity.getRevision());
+
+                    // only consider update this component if the user had permissions to it
+                    if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
+                        final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent();
+                        affectedComponent.setState(entity.getComponent().getState());
+
+                        if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
+                            affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors());
+                        }
+                    }
+                });
+    }
 
     /**
      * Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State.
      *
-     * @param client the Jersey Client to use for making the HTTP Request
      * @param groupId the ID of the Process Group to poll
      * @param serviceIds the ID of all Controller Services whose state should be equal to the given desired state
      * @param desiredState the desired state for all services with the ID's given
      * @param pause the Pause that can be used to wait between polling
      * @return <code>true</code> if successful, <code>false</code> if unable to wait for services to reach the desired state
      */
-    private boolean waitForControllerServiceStatus(final Client client, final URI originalUri, final String groupId, final Set<String> serviceIds, final ControllerServiceState desiredState,
-        final Pause pause) {
+    private boolean waitForControllerServiceStatus(final URI originalUri, final String groupId, final Set<String> serviceIds,
+                                                   final ControllerServiceState desiredState, final VariableRegistryUpdateRequest updateRequest, final Pause pause) throws InterruptedException {
+
         URI groupUri;
         try {
             groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
@@ -923,16 +981,31 @@ public class ProcessGroupResource extends ApplicationResource {
             throw new RuntimeException(e);
         }
 
+        final Map<String, String> headers = new HashMap<>();
+        final MultivaluedMap<String, String> requestEntity = new MultivaluedMapImpl();
+
         boolean continuePolling = true;
         while (continuePolling) {
-            final ClientResponse response = client.resource(groupUri).header("Content-Type", "application/json").get(ClientResponse.class);
-            if (response.getStatus() != Status.OK.getStatusCode()) {
+
+            // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
+            final NodeResponse clusterResponse;
+            if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+                clusterResponse = getRequestReplicator().replicate(HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
+            } else {
+                clusterResponse = getRequestReplicator().forwardToCoordinator(
+                        getClusterCoordinatorNode(), HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
+            }
+
+            if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
                 return false;
             }
 
-            final ControllerServicesEntity controllerServicesEntity = response.getEntity(ControllerServicesEntity.class);
+            final ControllerServicesEntity controllerServicesEntity = getResponseEntity(clusterResponse, ControllerServicesEntity.class);
             final Set<ControllerServiceEntity> serviceEntities = controllerServicesEntity.getControllerServices();
 
+            // update the affected controller services
+            updateAffectedControllerServices(serviceEntities, updateRequest);
+
             final String desiredStateName = desiredState.name();
             final boolean allServicesMatch = serviceEntities.stream()
                 .map(entity -> entity.getComponent())
@@ -963,11 +1036,16 @@ public class ProcessGroupResource extends ApplicationResource {
      * @param user the user that is retrieving the controller services
      * @return <code>true</code> if successful, <code>false</code> if unable to wait for services to reach the desired state
      */
-    private boolean waitForLocalControllerServiceStatus(final String groupId, final Set<String> serviceIds, final ControllerServiceState desiredState, final Pause pause, final NiFiUser user) {
+    private boolean waitForLocalControllerServiceStatus(final String groupId, final Set<String> serviceIds, final ControllerServiceState desiredState,
+                                                        final VariableRegistryUpdateRequest updateRequest, final Pause pause, final NiFiUser user) {
+
         boolean continuePolling = true;
         while (continuePolling) {
             final Set<ControllerServiceEntity> serviceEntities = serviceFacade.getControllerServices(groupId, false, true, user);
 
+            // update the affected controller services
+            updateAffectedControllerServices(serviceEntities, updateRequest);
+
             final String desiredStateName = desiredState.name();
             final boolean allServicesMatch = serviceEntities.stream()
                 .map(entity -> entity.getComponent())
@@ -975,6 +1053,7 @@ public class ProcessGroupResource extends ApplicationResource {
                 .map(service -> service.getState())
                 .allMatch(state -> desiredStateName.equals(state));
 
+
             if (allServicesMatch) {
                 logger.debug("All {} controller services of interest now have the desired state of {}", serviceIds.size(), desiredState);
                 return true;
@@ -987,8 +1066,8 @@ public class ProcessGroupResource extends ApplicationResource {
         return false;
     }
 
-    private VariableRegistryUpdateRequest createVariableRegistryUpdateRequest(final String groupId) {
-        final VariableRegistryUpdateRequest updateRequest = new VariableRegistryUpdateRequest(UUID.randomUUID().toString(), groupId);
+    private VariableRegistryUpdateRequest createVariableRegistryUpdateRequest(final String groupId, final Set<AffectedComponentEntity> affectedComponents, final NiFiUser user) {
+        final VariableRegistryUpdateRequest updateRequest = new VariableRegistryUpdateRequest(UUID.randomUUID().toString(), groupId, affectedComponents, user);
 
         // before adding to the request map, purge any old requests. Must do this by creating a List of ID's
         // and then removing those ID's one-at-a-time in order to avoid ConcurrentModificationException.
@@ -1011,27 +1090,26 @@ public class ProcessGroupResource extends ApplicationResource {
         return updateRequest;
     }
 
-    private Response updateVariableRegistryLocal(final String groupId, final List<AffectedComponentDTO> affectedProcessors, final List<AffectedComponentDTO> affectedServices,
-        final VariableRegistryEntity requestEntity) {
+    private Response updateVariableRegistryLocal(final String groupId, final Set<AffectedComponentEntity> affectedComponents, final List<AffectedComponentDTO> affectedProcessors,
+                                                 final List<AffectedComponentDTO> affectedServices, final NiFiUser user, final VariableRegistryEntity requestEntity) {
 
         final Set<String> affectedProcessorIds = affectedProcessors == null ? Collections.emptySet() : affectedProcessors.stream()
-            .map(component -> component.getComponentId())
+            .map(component -> component.getId())
             .collect(Collectors.toSet());
         Map<String, Revision> processorRevisionMap = getRevisions(groupId, affectedProcessorIds);
 
         final Set<String> affectedServiceIds = affectedServices == null ? Collections.emptySet() : affectedServices.stream()
-            .map(component -> component.getComponentId())
+            .map(component -> component.getId())
             .collect(Collectors.toSet());
         Map<String, Revision> serviceRevisionMap = getRevisions(groupId, affectedServiceIds);
 
         // update the variable registry
-        final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId);
+        final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId, affectedComponents, user);
         updateRequest.getIdentifyRelevantComponentsStep().setComplete(true);
         final Pause pause = createPause(updateRequest);
 
         final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId);
 
-        final NiFiUser user = NiFiUserUtils.getNiFiUser();
         final Runnable updateTask = new Runnable() {
             @Override
             public void run() {
@@ -1052,21 +1130,26 @@ public class ProcessGroupResource extends ApplicationResource {
 
                     // Apply the updates
                     performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getApplyUpdatesStep(), "Applying updates to Variable Registry",
-                        () -> serviceFacade.updateVariableRegistry(user, requestRevision, requestEntity.getVariableRegistry()));
+                        () -> {
+                            final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(user, requestRevision, requestEntity.getVariableRegistry());
+                            updateRequest.setProcessGroupRevision(entity.getProcessGroupRevision());
+                        });
 
                     // Re-enable the controller services
                     performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getEnableServicesStep(), "Re-enabling Controller Services",
-                        () -> enableControllerServices(user, groupId, updatedServiceRevisionMap, pause));
+                        () -> enableControllerServices(user, updateRequest, groupId, updatedServiceRevisionMap, pause));
 
                     // Restart processors
                     performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getStartProcessorsStep(), "Restarting Processors",
-                        () -> startProcessors(user, groupId, updatedProcessorRevisionMap, pause));
+                        () -> startProcessors(user, updateRequest, groupId, updatedProcessorRevisionMap, pause));
 
                     // Set complete
                     updateRequest.setComplete(true);
                     updateRequest.setLastUpdated(new Date());
                 } catch (final Exception e) {
                     logger.error("Failed to update Variable Registry for Proces Group with ID " + groupId, e);
+
+                    updateRequest.setComplete(true);
                     updateRequest.setFailureReason("An unexpected error has occurred: " + e);
                 }
             }
@@ -1076,10 +1159,11 @@ public class ProcessGroupResource extends ApplicationResource {
         variableRegistryThreadPool.submit(updateTask);
 
         final VariableRegistryUpdateRequestEntity responseEntity = new VariableRegistryUpdateRequestEntity();
-        responseEntity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest));
-        responseEntity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId()));
+        responseEntity.setRequest(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest));
+        responseEntity.setProcessGroupRevision(updateRequest.getProcessGroupRevision());
+        responseEntity.getRequest().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId()));
 
-        final URI location = URI.create(responseEntity.getRequestDto().getUri());
+        final URI location = URI.create(responseEntity.getRequest().getUri());
         return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build();
     }
 
@@ -1103,11 +1187,12 @@ public class ProcessGroupResource extends ApplicationResource {
             action.run();
             step.setComplete(true);
         } catch (final Exception e) {
-            request.setComplete(true);
             logger.error("Failed to update variable registry for Process Group with ID {}", groupId, e);
 
             step.setComplete(true);
             step.setFailureReason(e.getMessage());
+
+            request.setComplete(true);
             request.setFailureReason("Failed to update Variable Registry because failed while performing step: " + stepDescription);
         }
 
@@ -1123,21 +1208,21 @@ public class ProcessGroupResource extends ApplicationResource {
 
         serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.STOPPED, processorRevisions.keySet());
         serviceFacade.scheduleComponents(user, processGroupId, ScheduledState.STOPPED, processorRevisions);
-        waitForLocalProcessorStatus(processGroupId, processorRevisions.keySet(), ScheduledState.STOPPED, pause);
+        waitForLocalProcessor(processGroupId, processorRevisions.keySet(), ScheduledState.STOPPED, updateRequest, pause);
     }
 
-    private void startProcessors(final NiFiUser user, final String processGroupId, final Map<String, Revision> processorRevisions, final Pause pause) {
+    private void startProcessors(final NiFiUser user, final VariableRegistryUpdateRequest request, final String processGroupId, final Map<String, Revision> processorRevisions, final Pause pause) {
         if (processorRevisions.isEmpty()) {
             return;
         }
 
         serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.RUNNING, processorRevisions.keySet());
         serviceFacade.scheduleComponents(user, processGroupId, ScheduledState.RUNNING, processorRevisions);
-        waitForLocalProcessorStatus(processGroupId, processorRevisions.keySet(), ScheduledState.RUNNING, pause);
+        waitForLocalProcessor(processGroupId, processorRevisions.keySet(), ScheduledState.RUNNING, request, pause);
     }
 
     private void disableControllerServices(final NiFiUser user, final VariableRegistryUpdateRequest updateRequest, final String processGroupId,
-        final Map<String, Revision> serviceRevisions, final Pause pause) {
+                                           final Map<String, Revision> serviceRevisions, final Pause pause) {
 
         if (serviceRevisions.isEmpty()) {
             return;
@@ -1145,116 +1230,141 @@ public class ProcessGroupResource extends ApplicationResource {
 
         serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.DISABLED, serviceRevisions.keySet());
         serviceFacade.activateControllerServices(user, processGroupId, ControllerServiceState.DISABLED, serviceRevisions);
-        waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.DISABLED, pause, user);
+        waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.DISABLED, updateRequest, pause, user);
     }
 
-    private void enableControllerServices(final NiFiUser user, final String processGroupId, final Map<String, Revision> serviceRevisions, final Pause pause) {
+    private void enableControllerServices(final NiFiUser user, final VariableRegistryUpdateRequest updateRequest, final String processGroupId,
+                                          final Map<String, Revision> serviceRevisions, final Pause pause) {
+
         if (serviceRevisions.isEmpty()) {
             return;
         }
 
         serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.ENABLED, serviceRevisions.keySet());
         serviceFacade.activateControllerServices(user, processGroupId, ControllerServiceState.ENABLED, serviceRevisions);
-        waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.ENABLED, pause, user);
+        waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.ENABLED, updateRequest, pause, user);
     }
 
 
-    private void scheduleProcessors(final String groupId, final URI originalUri, final Client jerseyClient, final VariableRegistryUpdateRequest updateRequest,
-        final Pause pause, final Collection<AffectedComponentDTO> affectedProcessors, final ScheduledState desiredState, final VariableRegistryUpdateStep updateStep) {
+    private void scheduleProcessors(final String groupId, final URI originalUri, final VariableRegistryUpdateRequest updateRequest,
+                                    final Pause pause, final Collection<AffectedComponentDTO> affectedProcessors, final ScheduledState desiredState,
+                                    final VariableRegistryUpdateStep updateStep) throws InterruptedException {
+
         final Set<String> affectedProcessorIds = affectedProcessors.stream()
-            .map(component -> component.getComponentId())
+            .map(component -> component.getId())
             .collect(Collectors.toSet());
 
         final Map<String, Revision> processorRevisionMap = getRevisions(groupId, affectedProcessorIds);
         final Map<String, RevisionDTO> processorRevisionDtoMap = processorRevisionMap.entrySet().stream().collect(
             Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue())));
 
-        final ScheduleComponentsEntity stopProcessorsEntity = new ScheduleComponentsEntity();
-        stopProcessorsEntity.setComponents(processorRevisionDtoMap);
-        stopProcessorsEntity.setId(groupId);
-        stopProcessorsEntity.setState(desiredState.name());
+        final ScheduleComponentsEntity scheduleProcessorsEntity = new ScheduleComponentsEntity();
+        scheduleProcessorsEntity.setComponents(processorRevisionDtoMap);
+        scheduleProcessorsEntity.setId(groupId);
+        scheduleProcessorsEntity.setState(desiredState.name());
 
-        URI stopProcessorUri;
+        URI scheduleGroupUri;
         try {
-            stopProcessorUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
+            scheduleGroupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
                 originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId, null, originalUri.getFragment());
         } catch (URISyntaxException e) {
             throw new RuntimeException(e);
         }
 
-        final ClientResponse stopProcessorResponse = jerseyClient.resource(stopProcessorUri)
-            .header("Content-Type", "application/json")
-            .entity(stopProcessorsEntity)
-            .put(ClientResponse.class);
+        final Map<String, String> headers = new HashMap<>();
+        headers.put("content-type", MediaType.APPLICATION_JSON);
+
+        // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
+        final NodeResponse clusterResponse;
+        if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+            clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, scheduleGroupUri, scheduleProcessorsEntity, headers).awaitMergedResponse();
+        } else {
+            clusterResponse = getRequestReplicator().forwardToCoordinator(
+                    getClusterCoordinatorNode(), HttpMethod.PUT, scheduleGroupUri, scheduleProcessorsEntity, headers).awaitMergedResponse();
+        }
 
-        final int stopProcessorStatus = stopProcessorResponse.getStatus();
+        final int stopProcessorStatus = clusterResponse.getStatus();
         if (stopProcessorStatus != Status.OK.getStatusCode()) {
             updateRequest.getStopProcessorsStep().setFailureReason("Failed while " + updateStep.getDescription());
+
+            updateStep.setComplete(true);
             updateRequest.setFailureReason("Failed while " + updateStep.getDescription());
             return;
         }
 
         updateRequest.setLastUpdated(new Date());
-        final boolean processorsTransitioned = waitForProcessorStatus(jerseyClient, originalUri, groupId, affectedProcessorIds, desiredState, pause);
-        if (processorsTransitioned) {
-            updateStep.setComplete(true);
-        } else {
+        final boolean processorsTransitioned = waitForProcessorStatus(originalUri, groupId, affectedProcessorIds, desiredState, updateRequest, pause);
+        updateStep.setComplete(true);
+
+        if (!processorsTransitioned) {
             updateStep.setFailureReason("Failed while " + updateStep.getDescription());
+
+            updateRequest.setComplete(true);
             updateRequest.setFailureReason("Failed while " + updateStep.getDescription());
-            return;
         }
     }
 
-    private void activateControllerServices(final String groupId, final URI originalUri, final Client jerseyClient, final VariableRegistryUpdateRequest updateRequest,
-        final Pause pause, final Collection<AffectedComponentDTO> affectedServices, final ControllerServiceState desiredState, final VariableRegistryUpdateStep updateStep) {
+    private void activateControllerServices(final String groupId, final URI originalUri, final VariableRegistryUpdateRequest updateRequest,
+        final Pause pause, final Collection<AffectedComponentDTO> affectedServices, final ControllerServiceState desiredState, final VariableRegistryUpdateStep updateStep) throws InterruptedException {
 
         final Set<String> affectedServiceIds = affectedServices.stream()
-            .map(component -> component.getComponentId())
+            .map(component -> component.getId())
             .collect(Collectors.toSet());
 
         final Map<String, Revision> serviceRevisionMap = getRevisions(groupId, affectedServiceIds);
         final Map<String, RevisionDTO> serviceRevisionDtoMap = serviceRevisionMap.entrySet().stream().collect(
             Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue())));
 
-        final ActivateControllerServicesEntity disableServicesEntity = new ActivateControllerServicesEntity();
-        disableServicesEntity.setComponents(serviceRevisionDtoMap);
-        disableServicesEntity.setId(groupId);
-        disableServicesEntity.setState(desiredState.name());
+        final ActivateControllerServicesEntity activateServicesEntity = new ActivateControllerServicesEntity();
+        activateServicesEntity.setComponents(serviceRevisionDtoMap);
+        activateServicesEntity.setId(groupId);
+        activateServicesEntity.setState(desiredState.name());
 
-        URI disableServicesUri;
+        URI controllerServicesUri;
         try {
-            disableServicesUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
+            controllerServicesUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
                 originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", null, originalUri.getFragment());
         } catch (URISyntaxException e) {
             throw new RuntimeException(e);
         }
 
-        final ClientResponse disableServicesResponse = jerseyClient.resource(disableServicesUri)
-            .header("Content-Type", "application/json")
-            .entity(disableServicesEntity)
-            .put(ClientResponse.class);
+        final Map<String, String> headers = new HashMap<>();
+        headers.put("content-type", MediaType.APPLICATION_JSON);
 
-        final int disableServicesStatus = disableServicesResponse.getStatus();
+        // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
+        final NodeResponse clusterResponse;
+        if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+            clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse();
+        } else {
+            clusterResponse = getRequestReplicator().forwardToCoordinator(
+                    getClusterCoordinatorNode(), HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse();
+        }
+
+        final int disableServicesStatus = clusterResponse.getStatus();
         if (disableServicesStatus != Status.OK.getStatusCode()) {
             updateStep.setFailureReason("Failed while " + updateStep.getDescription());
+
+            updateStep.setComplete(true);
             updateRequest.setFailureReason("Failed while " + updateStep.getDescription());
             return;
         }
 
         updateRequest.setLastUpdated(new Date());
-        if (waitForControllerServiceStatus(jerseyClient, originalUri, groupId, affectedServiceIds, desiredState, pause)) {
-            updateStep.setComplete(true);
-        } else {
+        final boolean serviceTransitioned = waitForControllerServiceStatus(originalUri, groupId, affectedServiceIds, desiredState, updateRequest, pause);
+        updateStep.setComplete(true);
+
+        if (!serviceTransitioned) {
             updateStep.setFailureReason("Failed while " + updateStep.getDescription());
+
+            updateRequest.setComplete(true);
             updateRequest.setFailureReason("Failed while " + updateStep.getDescription());
-            return;
         }
     }
 
+    private void applyVariableRegistryUpdate(final String groupId, final URI originalUri, final VariableRegistryUpdateRequest updateRequest,
+        final VariableRegistryEntity updateEntity) throws InterruptedException, IOException {
 
-    private void applyVariableRegistryUpdate(final String groupId, final URI originalUri, final Client jerseyClient, final VariableRegistryUpdateRequest updateRequest,
-        final VariableRegistryEntity updateEntity) {
-
+        // convert request accordingly
         URI applyUpdatesUri;
         try {
             applyUpdatesUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
@@ -1263,21 +1373,53 @@ public class ProcessGroupResource extends ApplicationResource {
             throw new RuntimeException(e);
         }
 
-        final ClientResponse applyUpdatesResponse = jerseyClient.resource(applyUpdatesUri)
-            .header("Content-Type", "application/json")
-            .entity(updateEntity)
-            .put(ClientResponse.class);
+        final Map<String, String> headers = new HashMap<>();
+        headers.put("content-type", MediaType.APPLICATION_JSON);
 
-        final int applyUpdatesStatus = applyUpdatesResponse.getStatus();
+        // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
+        final NodeResponse clusterResponse;
+        if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+            clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, applyUpdatesUri, updateEntity, headers).awaitMergedResponse();
+        } else {
+            clusterResponse = getRequestReplicator().forwardToCoordinator(
+                    getClusterCoordinatorNode(), HttpMethod.PUT, applyUpdatesUri, updateEntity, headers).awaitMergedResponse();
+        }
+
+        final int applyUpdatesStatus = clusterResponse.getStatus();
         updateRequest.setLastUpdated(new Date());
-        if (applyUpdatesStatus != Status.OK.getStatusCode()) {
-            updateRequest.getApplyUpdatesStep().setFailureReason("Failed to apply updates to the Variable Registry");
-            updateRequest.setFailureReason("Failed to apply updates to the Variable Registry");
-            return;
+        updateRequest.getApplyUpdatesStep().setComplete(true);
+
+        if (applyUpdatesStatus == Status.OK.getStatusCode()) {
+            // grab the current process group revision
+            final VariableRegistryEntity entity = getResponseEntity(clusterResponse, VariableRegistryEntity.class);
+            updateRequest.setProcessGroupRevision(entity.getProcessGroupRevision());
+        } else {
+            final String message = getResponseEntity(clusterResponse, String.class);
+
+            // update the request progress
+            updateRequest.getApplyUpdatesStep().setFailureReason("Failed to apply updates to the Variable Registry: " + message);
+            updateRequest.setComplete(true);
+            updateRequest.setFailureReason("Failed to apply updates to the Variable Registry: " + message);
         }
     }
 
     /**
+     * Extracts the response entity from the specified node response.
+     *
+     * @param nodeResponse node response
+     * @param clazz class
+     * @param <T> type of class
+     * @return the response entity
+     */
+    private <T> T getResponseEntity(final NodeResponse nodeResponse, final Class<T> clazz) {
+        T entity = (T) nodeResponse.getUpdatedEntity();
+        if (entity == null) {
+            entity = nodeResponse.getClientResponse().getEntity(clazz);
+        }
+        return entity;
+    }
+
+    /**
      * Removes the specified process group reference.
      *
      * @param httpServletRequest request
@@ -1676,7 +1818,8 @@ public class ProcessGroupResource extends ApplicationResource {
                     value = "The process group id.",
                     required = true
             )
-            @PathParam("id") final String groupId) {
+            @PathParam("id") final String groupId,
+            @ApiParam("Whether or not to include processors from descendant process groups") @QueryParam("includeDescendantGroups") @DefaultValue("false") boolean includeDescendantGroups) {
 
         if (isReplicateRequest()) {
             return replicate(HttpMethod.GET);
@@ -1689,7 +1832,7 @@ public class ProcessGroupResource extends ApplicationResource {
         });
 
         // get the processors
-        final Set<ProcessorEntity> processors = serviceFacade.getProcessors(groupId);
+        final Set<ProcessorEntity> processors = serviceFacade.getProcessors(groupId, includeDescendantGroups);
 
         // create the response entity
         final ProcessorsEntity entity = new ProcessorsEntity();
@@ -3121,7 +3264,6 @@ public class ProcessGroupResource extends ApplicationResource {
             uriBuilder.segment("process-groups", groupId, "templates", "import");
             final URI importUri = uriBuilder.build();
 
-            // change content type to XML for serializing entity
             final Map<String, String> headersToOverride = new HashMap<>();
             headersToOverride.put("content-type", MediaType.APPLICATION_XML);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/eac47e90/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index ed42e9f..1a4d52b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -16,33 +16,6 @@
  */
 package org.apache.nifi.web.api.dto;
 
-import java.text.Collator;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import javax.ws.rs.WebApplicationException;
-
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
@@ -182,6 +155,7 @@ import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
 import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO;
 import org.apache.nifi.web.api.entity.AccessPolicyEntity;
 import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity;
+import org.apache.nifi.web.api.entity.AffectedComponentEntity;
 import org.apache.nifi.web.api.entity.AllowableValueEntity;
 import org.apache.nifi.web.api.entity.BulletinEntity;
 import org.apache.nifi.web.api.entity.ComponentReferenceEntity;
@@ -196,6 +170,32 @@ import org.apache.nifi.web.api.entity.VariableEntity;
 import org.apache.nifi.web.controller.ControllerFacade;
 import org.apache.nifi.web.revision.RevisionManager;
 
+import javax.ws.rs.WebApplicationException;
+import java.text.Collator;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
 public final class DtoFactory {
 
     @SuppressWarnings("rawtypes")
@@ -1737,13 +1737,29 @@ public final class DtoFactory {
 
     public AffectedComponentDTO createAffectedComponentDto(final ConfiguredComponent component) {
         final AffectedComponentDTO dto = new AffectedComponentDTO();
-        dto.setComponentId(component.getIdentifier());
-        dto.setParentGroupId(component.getProcessGroupIdentifier());
+        dto.setId(component.getIdentifier());
+        dto.setName(component.getName());
+        dto.setProcessGroupId(component.getProcessGroupIdentifier());
 
         if (component instanceof ProcessorNode) {
-            dto.setComponentType(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
+            final ProcessorNode node = ((ProcessorNode) component);
+            dto.setState(node.getScheduledState().name());
+            dto.setActiveThreadCount(node.getActiveThreadCount());
+            dto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
         } else if (component instanceof ControllerServiceNode) {
-            dto.setComponentType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+            final ControllerServiceNode node = ((ControllerServiceNode) component);
+            dto.setState(node.getState().name());
+            dto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+        }
+
+        final Collection<ValidationResult> validationErrors = component.getValidationErrors();
+        if (validationErrors != null && !validationErrors.isEmpty()) {
+            final List<String> errors = new ArrayList<>();
+            for (final ValidationResult validationResult : validationErrors) {
+                errors.add(validationResult.toString());
+            }
+
+            dto.setValidationErrors(errors);
         }
 
         return dto;
@@ -2114,8 +2130,18 @@ public final class DtoFactory {
         return deprecationNotice == null ? null : deprecationNotice.reason();
     }
 
+    public Set<AffectedComponentEntity> createAffectedComponentEntities(final Set<ConfiguredComponent> affectedComponents, final RevisionManager revisionManager) {
+        return affectedComponents.stream()
+                .map(component -> {
+                    final AffectedComponentDTO affectedComponent = createAffectedComponentDto(component);
+                    final PermissionsDTO permissions = createPermissionsDto(component);
+                    final RevisionDTO revision = createRevisionDTO(revisionManager.getRevision(component.getIdentifier()));
+                    return entityFactory.createAffectedComponentEntity(affectedComponent, revision, permissions);
+                })
+                .collect(Collectors.toSet());
+    }
 
-    public VariableRegistryDTO createVariableRegistryDto(final ProcessGroup processGroup) {
+    public VariableRegistryDTO createVariableRegistryDto(final ProcessGroup processGroup, final RevisionManager revisionManager) {
         final ComponentVariableRegistry variableRegistry = processGroup.getVariableRegistry();
 
         final List<String> variableNames = variableRegistry.getVariableMap().keySet().stream()
@@ -2130,21 +2156,18 @@ public final class DtoFactory {
             variableDto.setValue(variableRegistry.getVariableValue(variableName));
             variableDto.setProcessGroupId(processGroup.getIdentifier());
 
-            final Set<ConfiguredComponent> affectedComponents = processGroup.getComponentsAffectedByVariable(variableName);
-            final Set<AffectedComponentDTO> affectedComponentDtos = affectedComponents.stream()
-                .map(component -> createAffectedComponentDto(component))
-                .collect(Collectors.toSet());
+            final Set<AffectedComponentEntity> affectedComponentEntities = createAffectedComponentEntities(processGroup.getComponentsAffectedByVariable(variableName), revisionManager);
 
             boolean canWrite = true;
-            for (final ConfiguredComponent component : affectedComponents) {
-                final PermissionsDTO permissions = createPermissionsDto(component);
+            for (final AffectedComponentEntity affectedComponent : affectedComponentEntities) {
+                final PermissionsDTO permissions = affectedComponent.getPermissions();
                 if (!permissions.getCanRead() || !permissions.getCanWrite()) {
                     canWrite = false;
                     break;
                 }
             }
 
-            variableDto.setAffectedComponents(affectedComponentDtos);
+            variableDto.setAffectedComponents(affectedComponentEntities);
 
             final VariableEntity variableEntity = new VariableEntity();
             variableEntity.setVariable(variableDto);
@@ -2178,6 +2201,8 @@ public final class DtoFactory {
         updateSteps.add(createVariableRegistryUpdateStepDto(request.getStartProcessorsStep()));
         dto.setUpdateSteps(updateSteps);
 
+        dto.setAffectedComponents(new HashSet<>(request.getAffectedComponents().values()));
+
         return dto;
     }
 
@@ -2190,42 +2215,41 @@ public final class DtoFactory {
     }
 
 
-    public VariableRegistryDTO populateAffectedComponents(final VariableRegistryDTO variableRegistry, final ProcessGroup group) {
+    public VariableRegistryDTO populateAffectedComponents(final VariableRegistryDTO variableRegistry, final ProcessGroup group, final RevisionManager revisionManager) {
         if (!group.getIdentifier().equals(variableRegistry.getProcessGroupId())) {
             throw new IllegalArgumentException("Variable Registry does not have the same Group ID as the given Process Group");
         }
 
         final Set<VariableEntity> variableEntities = new LinkedHashSet<>();
 
-        for (final VariableEntity inputEntity : variableRegistry.getVariables()) {
-            final VariableEntity entity = new VariableEntity();
+        if (variableRegistry.getVariables() != null) {
+            for (final VariableEntity inputEntity : variableRegistry.getVariables()) {
+                final VariableEntity entity = new VariableEntity();
 
-            final VariableDTO inputDto = inputEntity.getVariable();
-            final VariableDTO variableDto = new VariableDTO();
-            variableDto.setName(inputDto.getName());
-            variableDto.setValue(inputDto.getValue());
-            variableDto.setProcessGroupId(group.getIdentifier());
+                final VariableDTO inputDto = inputEntity.getVariable();
+                final VariableDTO variableDto = new VariableDTO();
+                variableDto.setName(inputDto.getName());
+                variableDto.setValue(inputDto.getValue());
+                variableDto.setProcessGroupId(group.getIdentifier());
 
-            final Set<ConfiguredComponent> affectedComponents = group.getComponentsAffectedByVariable(variableDto.getName());
-            final Set<AffectedComponentDTO> affectedComponentDtos = affectedComponents.stream()
-                .map(component -> createAffectedComponentDto(component))
-                .collect(Collectors.toSet());
+                final Set<AffectedComponentEntity> affectedComponentEntities = createAffectedComponentEntities(group.getComponentsAffectedByVariable(variableDto.getName()), revisionManager);
 
-            boolean canWrite = true;
-            for (final ConfiguredComponent component : affectedComponents) {
-                final PermissionsDTO permissions = createPermissionsDto(component);
-                if (!permissions.getCanRead() || !permissions.getCanWrite()) {
-                    canWrite = false;
-                    break;
+                boolean canWrite = true;
+                for (final AffectedComponentEntity affectedComponent : affectedComponentEntities) {
+                    final PermissionsDTO permissions = affectedComponent.getPermissions();
+                    if (!permissions.getCanRead() || !permissions.getCanWrite()) {
+                        canWrite = false;
+                        break;
+                    }
                 }
-            }
 
-            variableDto.setAffectedComponents(affectedComponentDtos);
+                variableDto.setAffectedComponents(affectedComponentEntities);
 
-            entity.setCanWrite(canWrite);
-            entity.setVariable(inputDto);
+                entity.setCanWrite(canWrite);
+                entity.setVariable(inputDto);
 
-            variableEntities.add(entity);
+                variableEntities.add(entity);
+            }
         }
 
         final VariableRegistryDTO registryDto = new VariableRegistryDTO();

http://git-wip-us.apache.org/repos/asf/nifi/blob/eac47e90/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
index a7f370a..16781c6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
@@ -33,6 +33,7 @@ import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 import org.apache.nifi.web.api.entity.AccessPolicyEntity;
 import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity;
 import org.apache.nifi.web.api.entity.ActionEntity;
+import org.apache.nifi.web.api.entity.AffectedComponentEntity;
 import org.apache.nifi.web.api.entity.AllowableValueEntity;
 import org.apache.nifi.web.api.entity.BulletinEntity;
 import org.apache.nifi.web.api.entity.ComponentReferenceEntity;
@@ -311,6 +312,20 @@ public final class EntityFactory {
         return entity;
     }
 
+    public AffectedComponentEntity createAffectedComponentEntity(final AffectedComponentDTO dto, final RevisionDTO revision, final PermissionsDTO permissions) {
+        final AffectedComponentEntity entity = new AffectedComponentEntity();
+        entity.setRevision(revision);
+        if (dto != null) {
+            entity.setPermissions(permissions);
+            entity.setId(dto.getId());
+
+            if (permissions != null && permissions.getCanRead()) {
+                entity.setComponent(dto);
+            }
+        }
+        return entity;
+    }
+
     public UserGroupEntity createUserGroupEntity(final UserGroupDTO dto, final RevisionDTO revision, final PermissionsDTO permissions) {
         final UserGroupEntity entity = new UserGroupEntity();
         entity.setRevision(revision);

http://git-wip-us.apache.org/repos/asf/nifi/blob/eac47e90/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java
index 1d88161..a1bf170 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java
@@ -59,9 +59,10 @@ public interface ProcessorDAO {
      * Gets all the Processor transfer objects for this controller.
      *
      * @param groupId group id
+     * @param includeDescendants if processors from descendant groups should be included
      * @return List of all the Processors
      */
-    Set<ProcessorNode> getProcessors(String groupId);
+    Set<ProcessorNode> getProcessors(String groupId, boolean includeDescendants);
 
     /**
      * Verifies the specified processor can be updated.

http://git-wip-us.apache.org/repos/asf/nifi/blob/eac47e90/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
index e11f9ad..429592c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
@@ -58,6 +58,7 @@ import java.util.Set;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
+import java.util.stream.Collectors;
 
 public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
 
@@ -307,9 +308,13 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
     }
 
     @Override
-    public Set<ProcessorNode> getProcessors(String groupId) {
+    public Set<ProcessorNode> getProcessors(String groupId, boolean includeDescendants) {
         ProcessGroup group = locateProcessGroup(flowController, groupId);
-        return group.getProcessors();
+        if (includeDescendants) {
+            return group.findAllProcessors().stream().collect(Collectors.toSet());
+        } else {
+            return group.getProcessors();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/eac47e90/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml
index 0d60df2..72cd5ed 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml
@@ -461,6 +461,7 @@
                                                 <include>${staging.dir}/js/nf/canvas/nf-port-configuration.js</include>
                                                 <include>${staging.dir}/js/nf/canvas/nf-port-details.js</include>
                                                 <include>${staging.dir}/js/nf/canvas/nf-process-group-configuration.js</include>
+                                                <include>${staging.dir}/js/nf/canvas/nf-variable-registry.js</include>
                                                 <include>${staging.dir}/js/nf/canvas/nf-component-version.js</include>
                                                 <include>${staging.dir}/js/nf/canvas/nf-remote-process-group-configuration.js</include>
                                                 <include>${staging.dir}/js/nf/canvas/nf-remote-process-group-details.js</include>