You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2017/08/17 14:43:19 UTC

[2/9] nifi git commit: NIFI-4224: - Initial implementation of Process Group level Variable Registry - Updated to incorporate PR Feedback - Changed log message because slf4j-simple apparently has a memory leak; passing a String instead of passing in the C

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index a9167ae..0b634a1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -16,14 +16,54 @@
  */
 package org.apache.nifi.web.api;
 
-import com.sun.jersey.api.core.ResourceContext;
-import com.sun.jersey.multipart.FormDataParam;
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriInfo;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.AuthorizableLookup;
 import org.apache.nifi.authorization.AuthorizeControllerServiceReference;
@@ -38,26 +78,41 @@ import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.authorization.user.NiFiUserUtils;
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.serialization.FlowEncodingVersion;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.framework.security.util.SslContextFactory;
+import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
+import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
 import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
 import org.apache.nifi.util.BundleUtils;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.ResourceNotFoundException;
 import org.apache.nifi.web.Revision;
+import org.apache.nifi.web.api.dto.AffectedComponentDTO;
 import org.apache.nifi.web.api.dto.BundleDTO;
 import org.apache.nifi.web.api.dto.ConnectionDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.DtoFactory;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
 import org.apache.nifi.web.api.dto.PositionDTO;
 import org.apache.nifi.web.api.dto.ProcessGroupDTO;
 import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
+import org.apache.nifi.web.api.dto.RevisionDTO;
 import org.apache.nifi.web.api.dto.TemplateDTO;
+import org.apache.nifi.web.api.dto.VariableRegistryDTO;
 import org.apache.nifi.web.api.dto.flow.FlowDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
 import org.apache.nifi.web.api.entity.ConnectionEntity;
 import org.apache.nifi.web.api.entity.ConnectionsEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.apache.nifi.web.api.entity.ControllerServicesEntity;
 import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
 import org.apache.nifi.web.api.entity.CreateTemplateRequestEntity;
 import org.apache.nifi.web.api.entity.FlowEntity;
@@ -70,47 +125,36 @@ import org.apache.nifi.web.api.entity.LabelsEntity;
 import org.apache.nifi.web.api.entity.OutputPortsEntity;
 import org.apache.nifi.web.api.entity.PortEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupsEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
 import org.apache.nifi.web.api.entity.ProcessorsEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
+import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
 import org.apache.nifi.web.api.entity.TemplateEntity;
+import org.apache.nifi.web.api.entity.VariableRegistryEntity;
+import org.apache.nifi.web.api.entity.VariableRegistryUpdateRequestEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.apache.nifi.web.api.request.LongParameter;
+import org.apache.nifi.web.util.Pause;
+import org.apache.nifi.web.util.WebUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriBuilder;
-import javax.ws.rs.core.UriInfo;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.transform.stream.StreamSource;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.api.core.ResourceContext;
+import com.sun.jersey.multipart.FormDataParam;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
 
 /**
  * RESTful endpoint for managing a Group.
@@ -139,6 +183,22 @@ public class ProcessGroupResource extends ApplicationResource {
     private ConnectionResource connectionResource;
     private TemplateResource templateResource;
     private ControllerServiceResource controllerServiceResource;
+    private DtoFactory dtoFactory;
+
+    private final ConcurrentMap<String, VariableRegistryUpdateRequest> varRegistryUpdateRequests = new ConcurrentHashMap<>();
+    private static final int MAX_VARIABLE_REGISTRY_UPDATE_REQUESTS = 100;
+    private static final long VARIABLE_REGISTRY_UPDATE_REQUEST_EXPIRATION = TimeUnit.MINUTES.toMillis(1L);
+    private final ExecutorService variableRegistryThreadPool = new ThreadPoolExecutor(1, 50, 5L, TimeUnit.SECONDS,
+        new ArrayBlockingQueue<Runnable>(MAX_VARIABLE_REGISTRY_UPDATE_REQUESTS),
+        new ThreadFactory() {
+            @Override
+            public Thread newThread(final Runnable r) {
+                final Thread thread = Executors.defaultThreadFactory().newThread(r);
+                thread.setName("Variable Registry Update Thread");
+                thread.setDaemon(true);
+                return thread;
+            }
+        });
 
     /**
      * Populates the remaining fields in the specified process groups.
@@ -164,6 +224,7 @@ public class ProcessGroupResource extends ApplicationResource {
         return processGroupEntity;
     }
 
+
     /**
      * Populates the remaining content of the specified snippet.
      */
@@ -238,6 +299,49 @@ public class ProcessGroupResource extends ApplicationResource {
         return generateOkResponse(entity).build();
     }
 
+
+    /**
+     * Retrieves the Variable Registry for the group with the given ID
+     *
+     * @param groupId the ID of the Process Group
+     * @return the Variable Registry for the group
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/variable-registry")
+    @ApiOperation(value = "Gets a process group's variable registry",
+        response = VariableRegistryEntity.class,
+        notes = NON_GUARANTEED_ENDPOINT,
+        authorizations = {
+            @Authorization(value = "Read - /process-groups/{uuid}", type = "")
+        })
+    @ApiResponses(value = {
+        @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+        @ApiResponse(code = 401, message = "Client could not be authenticated."),
+        @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+        @ApiResponse(code = 404, message = "The specified resource could not be found."),
+        @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+    })
+    public Response getVariableRegistry(
+        @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId,
+        @ApiParam(value = "Whether or not to include ancestor groups", required = false) @QueryParam("includeAncestorGroups") @DefaultValue("true") final boolean includeAncestorGroups) {
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.GET);
+        }
+
+        // authorize access
+        serviceFacade.authorizeAccess(lookup -> {
+            final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
+            processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+        });
+
+        // get this process group's variable registry
+        final VariableRegistryEntity entity = serviceFacade.getVariableRegistry(groupId, includeAncestorGroups);
+        return generateOkResponse(entity).build();
+    }
+
     /**
      * Updates the specified process group.
      *
@@ -314,7 +418,7 @@ public class ProcessGroupResource extends ApplicationResource {
                     Authorizable authorizable = lookup.getProcessGroup(id).getAuthorizable();
                     authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
                 },
-                null,
+                () -> serviceFacade.verifyUpdateProcessGroup(requestProcessGroupDTO),
                 (revision, processGroupEntity) -> {
                     // update the process group
                     final ProcessGroupEntity entity = serviceFacade.updateProcessGroup(revision, processGroupEntity.getComponent());
@@ -325,6 +429,854 @@ public class ProcessGroupResource extends ApplicationResource {
         );
     }
 
+
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{groupId}/variable-registry/update-requests/{updateId}")
+    @ApiOperation(value = "Gets a process group's variable registry",
+        response = VariableRegistryUpdateRequestEntity.class,
+        notes = NON_GUARANTEED_ENDPOINT,
+        authorizations = {
+            @Authorization(value = "Read - /process-groups/{uuid}", type = "")
+        })
+    @ApiResponses(value = {
+        @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+        @ApiResponse(code = 401, message = "Client could not be authenticated."),
+        @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+        @ApiResponse(code = 404, message = "The specified resource could not be found."),
+        @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+    })
+    public Response getVariableRegistryUpdateRequest(
+        @ApiParam(value = "The process group id.", required = true) @PathParam("groupId") final String groupId,
+        @ApiParam(value = "The ID of the Variable Registry Update Request", required = true) @PathParam("updateId") final String updateId) {
+
+        if (groupId == null || updateId == null) {
+            throw new IllegalArgumentException("Group ID and Update ID must both be specified.");
+        }
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.GET);
+        }
+
+        // authorize access
+        serviceFacade.authorizeAccess(lookup -> {
+            final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
+            processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+        });
+
+        final VariableRegistryUpdateRequest request = varRegistryUpdateRequests.get(updateId);
+        if (request == null) {
+            throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId);
+        }
+
+        if (!groupId.equals(request.getProcessGroupId())) {
+            throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId + " for Process Group with identifier " + groupId);
+        }
+
+        final VariableRegistryUpdateRequestEntity entity = new VariableRegistryUpdateRequestEntity();
+        entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request));
+        entity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId));
+        return generateOkResponse(entity).build();
+    }
+
+
+    @DELETE
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{groupId}/variable-registry/update-requests/{updateId}")
+    @ApiOperation(value = "Deletes an update request for a process group's variable registry. If the request is not yet complete, it will automatically be cancelled.",
+        response = VariableRegistryUpdateRequestEntity.class,
+        notes = NON_GUARANTEED_ENDPOINT,
+        authorizations = {
+            @Authorization(value = "Read - /process-groups/{uuid}", type = "")
+        })
+    @ApiResponses(value = {
+        @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+        @ApiResponse(code = 401, message = "Client could not be authenticated."),
+        @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+        @ApiResponse(code = 404, message = "The specified resource could not be found."),
+        @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+    })
+    public Response deleteVariableRegistryUpdateRequest(
+        @ApiParam(value = "The process group id.", required = true) @PathParam("groupId") final String groupId,
+        @ApiParam(value = "The ID of the Variable Registry Update Request", required = true) @PathParam("updateId") final String updateId) {
+
+        if (groupId == null || updateId == null) {
+            throw new IllegalArgumentException("Group ID and Update ID must both be specified.");
+        }
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.DELETE);
+        }
+
+        // authorize access
+        serviceFacade.authorizeAccess(lookup -> {
+            final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
+            processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+            processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+        });
+
+        final VariableRegistryUpdateRequest request = varRegistryUpdateRequests.remove(updateId);
+        if (request == null) {
+            throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId);
+        }
+
+        if (!groupId.equals(request.getProcessGroupId())) {
+            throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId + " for Process Group with identifier " + groupId);
+        }
+
+        request.cancel();
+
+        final VariableRegistryUpdateRequestEntity entity = new VariableRegistryUpdateRequestEntity();
+        entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request));
+        entity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId));
+        return generateOkResponse(entity).build();
+    }
+
+
+    @PUT
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/variable-registry")
+    @ApiOperation(value = "Updates the contents of a Process Group's variable Registry", response = VariableRegistryEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = {
+        @Authorization(value = "Write - /process-groups/{uuid}", type = "")
+    })
+    @ApiResponses(value = {
+        @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+        @ApiResponse(code = 401, message = "Client could not be authenticated."),
+        @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+        @ApiResponse(code = 404, message = "The specified resource could not be found."),
+        @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+    })
+    public Response updateVariableRegistry(
+        @Context final HttpServletRequest httpServletRequest,
+        @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId,
+        @ApiParam(value = "The process group configuration details.", required = true) final VariableRegistryEntity requestEntity) {
+
+        if (requestEntity == null || requestEntity.getVariableRegistry() == null) {
+            throw new IllegalArgumentException("Variable Registry details must be specified.");
+        }
+
+        if (requestEntity.getProcessGroupRevision() == null) {
+            throw new IllegalArgumentException("Process Group Revision must be specified.");
+        }
+
+        // ensure the same id is being used
+        final VariableRegistryDTO registryDto = requestEntity.getVariableRegistry();
+        if (!groupId.equals(registryDto.getProcessGroupId())) {
+            throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does "
+                + "not equal the process group id of the requested resource (%s).", registryDto.getProcessGroupId(), groupId));
+        }
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.PUT, requestEntity);
+        }
+
+        // handle expects request (usually from the cluster manager)
+        final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId);
+        return withWriteLock(
+            serviceFacade,
+            requestEntity,
+            requestRevision,
+            lookup -> {
+                Authorizable authorizable = lookup.getProcessGroup(groupId).getAuthorizable();
+                authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+            },
+            null,
+            (revision, processGroupEntity) -> {
+                // update the process group
+                final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(revision, registryDto);
+                return generateOkResponse(entity).build();
+            });
+    }
+
+
+    /**
+     * Updates the variable registry for the specified process group.
+     *
+     * @param httpServletRequest request
+     * @param groupId The id of the process group.
+     * @param requestEntity the Variable Registry Entity
+     * @return A Variable Registry Entry.
+     */
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{id}/variable-registry/update-requests")
+    @ApiOperation(value = "Submits a request to update a process group's variable registry",
+        response = VariableRegistryUpdateRequestEntity.class,
+        notes = NON_GUARANTEED_ENDPOINT,
+        authorizations = {
+            @Authorization(value = "Write - /process-groups/{uuid}", type = "")
+        })
+    @ApiResponses(value = {
+        @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+        @ApiResponse(code = 401, message = "Client could not be authenticated."),
+        @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+        @ApiResponse(code = 404, message = "The specified resource could not be found."),
+        @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+    })
+    public Response submitUpdateVariableRegistryRequest(
+        @Context final HttpServletRequest httpServletRequest,
+        @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId,
+        @ApiParam(value = "The process group configuration details.", required = true) final VariableRegistryEntity requestEntity) {
+
+        if (requestEntity == null || requestEntity.getVariableRegistry() == null) {
+            throw new IllegalArgumentException("Variable Registry details must be specified.");
+        }
+
+        if (requestEntity.getProcessGroupRevision() == null) {
+            throw new IllegalArgumentException("Process Group Revision must be specified.");
+        }
+
+        // In order to update variables in a variable registry, we have to perform the following steps:
+        // 1. Determine Affected Components (this includes any Processors and Controller Services and any components that reference an affected Controller Service).
+        //    1a. Determine ID's of components
+        //    1b. Determine Revision's of associated components
+        // 2. Stop All Affected Processors
+        // 3. Disable All Affected Controller Services
+        // 4. Update the Variables
+        // 5. Re-Enable all Affected Controller Services (services only, not dependent components)
+        // 6. Re-Enable all Processors that Depended on the Controller Services
+
+        // Determine the affected components (and their associated revisions)
+        final VariableRegistryEntity computedEntity = serviceFacade.populateAffectedComponents(requestEntity.getVariableRegistry());
+        final VariableRegistryDTO computedRegistryDto = computedEntity.getVariableRegistry();
+        if (computedRegistryDto == null) {
+            throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
+        }
+
+        final Set<AffectedComponentDTO> affectedComponents = serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry());
+
+        final Map<String, List<AffectedComponentDTO>> affectedComponentsByType = affectedComponents.stream()
+            .collect(Collectors.groupingBy(comp -> comp.getComponentType()));
+
+        final List<AffectedComponentDTO> affectedProcessors = affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
+        final List<AffectedComponentDTO> affectedServices = affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+
+
+        if (isReplicateRequest()) {
+            // update the variable registry
+            final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId);
+            updateRequest.getIdentifyRelevantComponentsStep().setComplete(true);
+            final URI originalUri = getAbsolutePath();
+
+            // Submit the task to be run in the background
+            final Runnable taskWrapper = () -> {
+                try {
+                    updateVariableRegistryReplicated(groupId, originalUri, affectedProcessors, affectedServices, updateRequest, requestEntity);
+                } catch (final Exception e) {
+                    logger.error("Failed to update variable registry", e);
+                    updateRequest.setFailureReason("An unexpected error has occurred: " + e);
+                }
+            };
+
+            variableRegistryThreadPool.submit(taskWrapper);
+
+            final VariableRegistryUpdateRequestEntity responseEntity = new VariableRegistryUpdateRequestEntity();
+            responseEntity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest));
+            responseEntity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId()));
+
+            final URI location = URI.create(responseEntity.getRequestDto().getUri());
+            return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build();
+        }
+
+
+        final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId);
+        return withWriteLock(
+            serviceFacade,
+            requestEntity,
+            requestRevision,
+            lookup -> {
+                final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+                final Authorizable groupAuthorizable = lookup.getProcessGroup(groupId).getAuthorizable();
+                groupAuthorizable.authorize(authorizer, RequestAction.WRITE, user);
+
+                // For every component that is affected, the user must have READ permissions and WRITE permissions
+                // (because this action requires stopping the component).
+                if (affectedProcessors != null) {
+                    for (final AffectedComponentDTO affectedComponent : affectedProcessors) {
+                        final Authorizable authorizable = lookup.getProcessor(affectedComponent.getComponentId()).getAuthorizable();
+                        authorizable.authorize(authorizer, RequestAction.READ, user);
+                        authorizable.authorize(authorizer, RequestAction.WRITE, user);
+                    }
+                }
+
+                if (affectedServices != null) {
+                    for (final AffectedComponentDTO affectedComponent : affectedServices) {
+                        final Authorizable authorizable = lookup.getControllerService(affectedComponent.getComponentId()).getAuthorizable();
+                        authorizable.authorize(authorizer, RequestAction.READ, user);
+                        authorizable.authorize(authorizer, RequestAction.WRITE, user);
+                    }
+                }
+            },
+            null,
+            (revision, varRegistryEntity) -> {
+                return updateVariableRegistryLocal(groupId, affectedProcessors, affectedServices, requestEntity);
+            });
+    }
+
+    private Pause createPause(final VariableRegistryUpdateRequest updateRequest) {
+        return new Pause() {
+            @Override
+            public boolean pause() {
+                if (updateRequest.isComplete()) {
+                    return false;
+                }
+
+                try {
+                    Thread.sleep(500);
+                } catch (final InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    return false;
+                }
+
+                return !updateRequest.isComplete();
+            }
+        };
+    }
+
+    private void updateVariableRegistryReplicated(final String groupId, final URI originalUri, final Collection<AffectedComponentDTO> affectedProcessors,
+        final Collection<AffectedComponentDTO> affectedServices,
+        final VariableRegistryUpdateRequest updateRequest, final VariableRegistryEntity requestEntity) {
+
+        final NiFiProperties properties = getProperties();
+        final Client jerseyClient = WebUtils.createClient(new DefaultClientConfig(), SslContextFactory.createSslContext(properties));
+        final int connectionTimeout = (int) FormatUtils.getTimeDuration(properties.getClusterNodeConnectionTimeout(), TimeUnit.MILLISECONDS);
+        final int readTimeout = (int) FormatUtils.getTimeDuration(properties.getClusterNodeReadTimeout(), TimeUnit.MILLISECONDS);
+        jerseyClient.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT, connectionTimeout);
+        jerseyClient.getProperties().put(ClientConfig.PROPERTY_READ_TIMEOUT, readTimeout);
+        jerseyClient.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS, Boolean.TRUE);
+
+        final Pause pause = createPause(updateRequest);
+
+        // stop processors
+        if (affectedProcessors != null) {
+            logger.info("In order to update Variable Registry for Process Group with ID {}, "
+                + "replicating request to stop {} affected processors", groupId, affectedProcessors.size());
+
+            scheduleProcessors(groupId, originalUri, jerseyClient, updateRequest, pause,
+                affectedProcessors, ScheduledState.STOPPED, updateRequest.getStopProcessorsStep());
+        }
+
+        // disable controller services
+        if (affectedServices != null) {
+            logger.info("In order to update Variable Registry for Process Group with ID {}, "
+                + "replicating request to stop {} affected Controller Services", groupId, affectedServices.size());
+
+            activateControllerServices(groupId, originalUri, jerseyClient, updateRequest, pause,
+                affectedServices, ControllerServiceState.DISABLED, updateRequest.getDisableServicesStep());
+        }
+
+        // apply updates
+        logger.info("In order to update Variable Registry for Process Group with ID {}, "
+            + "replicating request to apply updates to variable registry", groupId);
+        applyVariableRegistryUpdate(groupId, originalUri, jerseyClient, updateRequest, requestEntity);
+
+        // re-enable controller services
+        if (affectedServices != null) {
+            logger.info("In order to update Variable Registry for Process Group with ID {}, "
+                + "replicating request to re-enable {} affected services", groupId, affectedServices.size());
+
+            activateControllerServices(groupId, originalUri, jerseyClient, updateRequest, pause,
+                affectedServices, ControllerServiceState.ENABLED, updateRequest.getEnableServicesStep());
+        }
+
+        // restart processors
+        if (affectedProcessors != null) {
+            logger.info("In order to update Variable Registry for Process Group with ID {}, "
+                + "replicating request to restart {} affected processors", groupId, affectedProcessors.size());
+
+            scheduleProcessors(groupId, originalUri, jerseyClient, updateRequest, pause,
+                affectedProcessors, ScheduledState.RUNNING, updateRequest.getStartProcessorsStep());
+        }
+
+        updateRequest.setComplete(true);
+    }
+
+    /**
+     * Periodically polls the process group with the given ID, waiting for all processors whose ID's are given to have the given Scheduled State.
+     *
+     * @param client the Jersey Client to use for making the request
+     * @param groupId the ID of the Process Group to poll
+     * @param processorIds the ID of all Processors whose state should be equal to the given desired state
+     * @param desiredState the desired state for all processors with the ID's given
+     * @param pause the Pause that can be used to wait between polling
+     * @return <code>true</code> if successful, <code>false</code> if unable to wait for processors to reach the desired state
+     */
+    private boolean waitForProcessorStatus(final Client client, final URI originalUri, final String groupId, final Set<String> processorIds, final ScheduledState desiredState, final Pause pause) {
+        URI groupUri;
+        try {
+            groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
+                originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/status", "recursive=true", originalUri.getFragment());
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+
+        boolean continuePolling = true;
+        while (continuePolling) {
+            final ClientResponse response = client.resource(groupUri).header("Content-Type", "application/json").get(ClientResponse.class);
+            if (response.getStatus() != Status.OK.getStatusCode()) {
+                return false;
+            }
+
+            final ProcessGroupStatusEntity statusEntity = response.getEntity(ProcessGroupStatusEntity.class);
+            final ProcessGroupStatusDTO statusDto = statusEntity.getProcessGroupStatus();
+            final ProcessGroupStatusSnapshotDTO statusSnapshotDto = statusDto.getAggregateSnapshot();
+
+            if (isProcessorStatusEqual(statusSnapshotDto, processorIds, desiredState)) {
+                logger.debug("All {} processors of interest now have the desired state of {}", processorIds.size(), desiredState);
+                return true;
+            }
+
+            // Not all of the processors are in the desired state. Pause for a bit and poll again.
+            continuePolling = pause.pause();
+        }
+
+        return false;
+    }
+
+    /**
+     * Periodically polls the process group with the given ID, waiting for all processors whose ID's are given to have the given Scheduled State.
+     *
+     * @param groupId the ID of the Process Group to poll
+     * @param processorIds the ID of all Processors whose state should be equal to the given desired state
+     * @param desiredState the desired state for all processors with the ID's given
+     * @param pause the Pause that can be used to wait between polling
+     * @return <code>true</code> if successful, <code>false</code> if unable to wait for processors to reach the desired state
+     */
+    private boolean waitForLocalProcessorStatus(final String groupId, final Set<String> processorIds, final ScheduledState desiredState, final Pause pause) {
+        boolean continuePolling = true;
+        while (continuePolling) {
+            final ProcessGroupStatusEntity statusEntity = serviceFacade.getProcessGroupStatus(groupId, true);
+            final ProcessGroupStatusDTO statusDto = statusEntity.getProcessGroupStatus();
+            final ProcessGroupStatusSnapshotDTO statusSnapshotDto = statusDto.getAggregateSnapshot();
+
+            if (isProcessorStatusEqual(statusSnapshotDto, processorIds, desiredState)) {
+                logger.debug("All {} processors of interest now have the desired state of {}", processorIds.size(), desiredState);
+                return true;
+            }
+
+            // Not all of the processors are in the desired state. Pause for a bit and poll again.
+            continuePolling = pause.pause();
+        }
+
+        return false;
+    }
+
+    private boolean isProcessorStatusEqual(final ProcessGroupStatusSnapshotDTO statusSnapshot, final Set<String> processorIds, final ScheduledState desiredState) {
+        final String desiredStateName = desiredState.name();
+
+        final boolean allProcessorsMatch = statusSnapshot.getProcessorStatusSnapshots().stream()
+            .map(entity -> entity.getProcessorStatusSnapshot())
+            .filter(status -> processorIds.contains(status.getId()))
+            .allMatch(status -> {
+                final String runStatus = status.getRunStatus();
+                final boolean stateMatches = desiredStateName.equalsIgnoreCase(runStatus);
+                if (!stateMatches) {
+                    return false;
+                }
+
+                if (desiredState == ScheduledState.STOPPED && status.getActiveThreadCount() != 0) {
+                    return false;
+                }
+
+                return true;
+            });
+
+        if (!allProcessorsMatch) {
+            return false;
+        }
+
+        for (final ProcessGroupStatusSnapshotEntity childGroupEntity : statusSnapshot.getProcessGroupStatusSnapshots()) {
+            final ProcessGroupStatusSnapshotDTO childGroupStatus = childGroupEntity.getProcessGroupStatusSnapshot();
+            final boolean allMatchChildLevel = isProcessorStatusEqual(childGroupStatus, processorIds, desiredState);
+            if (!allMatchChildLevel) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+
+
+    /**
+     * Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State.
+     *
+     * @param client the Jersey Client to use for making the HTTP Request
+     * @param groupId the ID of the Process Group to poll
+     * @param serviceIds the ID of all Controller Services whose state should be equal to the given desired state
+     * @param desiredState the desired state for all services with the ID's given
+     * @param pause the Pause that can be used to wait between polling
+     * @return <code>true</code> if successful, <code>false</code> if unable to wait for services to reach the desired state
+     */
+    private boolean waitForControllerServiceStatus(final Client client, final URI originalUri, final String groupId, final Set<String> serviceIds, final ControllerServiceState desiredState,
+        final Pause pause) {
+        URI groupUri;
+        try {
+            groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
+                originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", "includeAncestorGroups=false,includeDescendantGroups=true", originalUri.getFragment());
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+
+        boolean continuePolling = true;
+        while (continuePolling) {
+            final ClientResponse response = client.resource(groupUri).header("Content-Type", "application/json").get(ClientResponse.class);
+            if (response.getStatus() != Status.OK.getStatusCode()) {
+                return false;
+            }
+
+            final ControllerServicesEntity controllerServicesEntity = response.getEntity(ControllerServicesEntity.class);
+            final Set<ControllerServiceEntity> serviceEntities = controllerServicesEntity.getControllerServices();
+
+            final String desiredStateName = desiredState.name();
+            final boolean allServicesMatch = serviceEntities.stream()
+                .map(entity -> entity.getComponent())
+                .filter(service -> serviceIds.contains(service.getId()))
+                .map(service -> service.getState())
+                .allMatch(state -> state.equals(desiredStateName));
+
+            if (allServicesMatch) {
+                logger.debug("All {} controller services of interest now have the desired state of {}", serviceIds.size(), desiredState);
+                return true;
+            }
+
+            // Not all of the processors are in the desired state. Pause for a bit and poll again.
+            continuePolling = pause.pause();
+        }
+
+        return false;
+    }
+
+
+    /**
+     * Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State.
+     *
+     * @param groupId the ID of the Process Group to poll
+     * @param serviceIds the ID of all Controller Services whose state should be equal to the given desired state
+     * @param desiredState the desired state for all services with the ID's given
+     * @param pause the Pause that can be used to wait between polling
+     * @param user the user that is retrieving the controller services
+     * @return <code>true</code> if successful, <code>false</code> if unable to wait for services to reach the desired state
+     */
+    private boolean waitForLocalControllerServiceStatus(final String groupId, final Set<String> serviceIds, final ControllerServiceState desiredState, final Pause pause, final NiFiUser user) {
+        boolean continuePolling = true;
+        while (continuePolling) {
+            final Set<ControllerServiceEntity> serviceEntities = serviceFacade.getControllerServices(groupId, false, true, user);
+
+            final String desiredStateName = desiredState.name();
+            final boolean allServicesMatch = serviceEntities.stream()
+                .map(entity -> entity.getComponent())
+                .filter(service -> serviceIds.contains(service.getId()))
+                .map(service -> service.getState())
+                .allMatch(state -> desiredStateName.equals(state));
+
+            if (allServicesMatch) {
+                logger.debug("All {} controller services of interest now have the desired state of {}", serviceIds.size(), desiredState);
+                return true;
+            }
+
+            // Not all of the processors are in the desired state. Pause for a bit and poll again.
+            continuePolling = pause.pause();
+        }
+
+        return false;
+    }
+
+    private VariableRegistryUpdateRequest createVariableRegistryUpdateRequest(final String groupId) {
+        final VariableRegistryUpdateRequest updateRequest = new VariableRegistryUpdateRequest(UUID.randomUUID().toString(), groupId);
+
+        // before adding to the request map, purge any old requests. Must do this by creating a List of ID's
+        // and then removing those ID's one-at-a-time in order to avoid ConcurrentModificationException.
+        final Date oneMinuteAgo = new Date(System.currentTimeMillis() - VARIABLE_REGISTRY_UPDATE_REQUEST_EXPIRATION);
+        final List<String> completedRequestIds = varRegistryUpdateRequests.entrySet().stream()
+            .filter(entry -> entry.getValue().isComplete())
+            .filter(entry -> entry.getValue().getLastUpdated().before(oneMinuteAgo))
+            .map(Map.Entry::getKey)
+            .collect(Collectors.toList());
+
+        completedRequestIds.stream().forEach(id -> varRegistryUpdateRequests.remove(id));
+
+        final int requestCount = varRegistryUpdateRequests.size();
+        if (requestCount > MAX_VARIABLE_REGISTRY_UPDATE_REQUESTS) {
+            throw new IllegalStateException("There are already " + requestCount + " update requests for variable registries. "
+                + "Cannot issue any more requests until the older ones are deleted or expire");
+        }
+
+        this.varRegistryUpdateRequests.put(updateRequest.getRequestId(), updateRequest);
+        return updateRequest;
+    }
+
+    private Response updateVariableRegistryLocal(final String groupId, final List<AffectedComponentDTO> affectedProcessors, final List<AffectedComponentDTO> affectedServices,
+        final VariableRegistryEntity requestEntity) {
+
+        final Set<String> affectedProcessorIds = affectedProcessors == null ? Collections.emptySet() : affectedProcessors.stream()
+            .map(component -> component.getComponentId())
+            .collect(Collectors.toSet());
+        Map<String, Revision> processorRevisionMap = getRevisions(groupId, affectedProcessorIds);
+
+        final Set<String> affectedServiceIds = affectedServices == null ? Collections.emptySet() : affectedServices.stream()
+            .map(component -> component.getComponentId())
+            .collect(Collectors.toSet());
+        Map<String, Revision> serviceRevisionMap = getRevisions(groupId, affectedServiceIds);
+
+        // update the variable registry
+        final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId);
+        updateRequest.getIdentifyRelevantComponentsStep().setComplete(true);
+        final Pause pause = createPause(updateRequest);
+
+        final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId);
+
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+        final Runnable updateTask = new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    // Stop processors
+                    performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getStopProcessorsStep(), "Stopping Processors",
+                        () -> stopProcessors(user, updateRequest, groupId, processorRevisionMap, pause));
+
+                    // Update revision map because this will have modified the revisions of our components.
+                    final Map<String, Revision> updatedProcessorRevisionMap = getRevisions(groupId, affectedProcessorIds);
+
+                    // Disable controller services
+                    performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getDisableServicesStep(), "Disabling Controller Services",
+                        () -> disableControllerServices(user, updateRequest, groupId, serviceRevisionMap, pause));
+
+                    // Update revision map because this will have modified the revisions of our components.
+                    final Map<String, Revision> updatedServiceRevisionMap = getRevisions(groupId, affectedServiceIds);
+
+                    // Apply the updates
+                    performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getApplyUpdatesStep(), "Applying updates to Variable Registry",
+                        () -> serviceFacade.updateVariableRegistry(user, requestRevision, requestEntity.getVariableRegistry()));
+
+                    // Re-enable the controller services
+                    performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getEnableServicesStep(), "Re-enabling Controller Services",
+                        () -> enableControllerServices(user, groupId, updatedServiceRevisionMap, pause));
+
+                    // Restart processors
+                    performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getStartProcessorsStep(), "Restarting Processors",
+                        () -> startProcessors(user, groupId, updatedProcessorRevisionMap, pause));
+
+                    // Set complete
+                    updateRequest.setComplete(true);
+                    updateRequest.setLastUpdated(new Date());
+                } catch (final Exception e) {
+                    logger.error("Failed to update Variable Registry for Proces Group with ID " + groupId, e);
+                    updateRequest.setFailureReason("An unexpected error has occurred: " + e);
+                }
+            }
+        };
+
+        // Submit the task to be run in the background
+        variableRegistryThreadPool.submit(updateTask);
+
+        final VariableRegistryUpdateRequestEntity responseEntity = new VariableRegistryUpdateRequestEntity();
+        responseEntity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest));
+        responseEntity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId()));
+
+        final URI location = URI.create(responseEntity.getRequestDto().getUri());
+        return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build();
+    }
+
+    private Map<String, Revision> getRevisions(final String groupId, final Set<String> componentIds) {
+        final Set<Revision> processorRevisions = serviceFacade.getRevisionsFromGroup(groupId, group -> componentIds);
+        return processorRevisions.stream().collect(Collectors.toMap(revision -> revision.getComponentId(), Function.identity()));
+    }
+
+    private void performUpdateVariableRegistryStep(final String groupId, final VariableRegistryUpdateRequest request, final VariableRegistryUpdateStep step,
+        final String stepDescription, final Runnable action) {
+
+        if (request.isComplete()) {
+            logger.info("In updating Variable Registry for Process Group with ID {}"
+                + ", skipping the following step because the request has completed already: {}", groupId, stepDescription);
+            return;
+        }
+
+        try {
+            logger.info("In order to update Variable Registry for Process Group with ID {}, {}", groupId, stepDescription);
+
+            action.run();
+            step.setComplete(true);
+        } catch (final Exception e) {
+            request.setComplete(true);
+            logger.error("Failed to update variable registry for Process Group with ID {}", groupId, e);
+
+            step.setComplete(true);
+            step.setFailureReason(e.getMessage());
+            request.setFailureReason("Failed to update Variable Registry because failed while performing step: " + stepDescription);
+        }
+
+        request.setLastUpdated(new Date());
+    }
+
+    private void stopProcessors(final NiFiUser user, final VariableRegistryUpdateRequest updateRequest, final String processGroupId,
+        final Map<String, Revision> processorRevisions, final Pause pause) {
+
+        if (processorRevisions.isEmpty()) {
+            return;
+        }
+
+        serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.STOPPED, processorRevisions.keySet());
+        serviceFacade.scheduleComponents(user, processGroupId, ScheduledState.STOPPED, processorRevisions);
+        waitForLocalProcessorStatus(processGroupId, processorRevisions.keySet(), ScheduledState.STOPPED, pause);
+    }
+
+    private void startProcessors(final NiFiUser user, final String processGroupId, final Map<String, Revision> processorRevisions, final Pause pause) {
+        if (processorRevisions.isEmpty()) {
+            return;
+        }
+
+        serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.RUNNING, processorRevisions.keySet());
+        serviceFacade.scheduleComponents(user, processGroupId, ScheduledState.RUNNING, processorRevisions);
+        waitForLocalProcessorStatus(processGroupId, processorRevisions.keySet(), ScheduledState.RUNNING, pause);
+    }
+
+    private void disableControllerServices(final NiFiUser user, final VariableRegistryUpdateRequest updateRequest, final String processGroupId,
+        final Map<String, Revision> serviceRevisions, final Pause pause) {
+
+        if (serviceRevisions.isEmpty()) {
+            return;
+        }
+
+        serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.DISABLED, serviceRevisions.keySet());
+        serviceFacade.activateControllerServices(user, processGroupId, ControllerServiceState.DISABLED, serviceRevisions);
+        waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.DISABLED, pause, user);
+    }
+
+    private void enableControllerServices(final NiFiUser user, final String processGroupId, final Map<String, Revision> serviceRevisions, final Pause pause) {
+        if (serviceRevisions.isEmpty()) {
+            return;
+        }
+
+        serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.ENABLED, serviceRevisions.keySet());
+        serviceFacade.activateControllerServices(user, processGroupId, ControllerServiceState.ENABLED, serviceRevisions);
+        waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.ENABLED, pause, user);
+    }
+
+
+    private void scheduleProcessors(final String groupId, final URI originalUri, final Client jerseyClient, final VariableRegistryUpdateRequest updateRequest,
+        final Pause pause, final Collection<AffectedComponentDTO> affectedProcessors, final ScheduledState desiredState, final VariableRegistryUpdateStep updateStep) {
+        final Set<String> affectedProcessorIds = affectedProcessors.stream()
+            .map(component -> component.getComponentId())
+            .collect(Collectors.toSet());
+
+        final Map<String, Revision> processorRevisionMap = getRevisions(groupId, affectedProcessorIds);
+        final Map<String, RevisionDTO> processorRevisionDtoMap = processorRevisionMap.entrySet().stream().collect(
+            Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue())));
+
+        final ScheduleComponentsEntity stopProcessorsEntity = new ScheduleComponentsEntity();
+        stopProcessorsEntity.setComponents(processorRevisionDtoMap);
+        stopProcessorsEntity.setId(groupId);
+        stopProcessorsEntity.setState(desiredState.name());
+
+        URI stopProcessorUri;
+        try {
+            stopProcessorUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
+                originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId, null, originalUri.getFragment());
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+
+        final ClientResponse stopProcessorResponse = jerseyClient.resource(stopProcessorUri)
+            .header("Content-Type", "application/json")
+            .entity(stopProcessorsEntity)
+            .put(ClientResponse.class);
+
+        final int stopProcessorStatus = stopProcessorResponse.getStatus();
+        if (stopProcessorStatus != Status.OK.getStatusCode()) {
+            updateRequest.getStopProcessorsStep().setFailureReason("Failed while " + updateStep.getDescription());
+            updateRequest.setFailureReason("Failed while " + updateStep.getDescription());
+            return;
+        }
+
+        updateRequest.setLastUpdated(new Date());
+        final boolean processorsTransitioned = waitForProcessorStatus(jerseyClient, originalUri, groupId, affectedProcessorIds, desiredState, pause);
+        if (processorsTransitioned) {
+            updateStep.setComplete(true);
+        } else {
+            updateStep.setFailureReason("Failed while " + updateStep.getDescription());
+            updateRequest.setFailureReason("Failed while " + updateStep.getDescription());
+            return;
+        }
+    }
+
+    private void activateControllerServices(final String groupId, final URI originalUri, final Client jerseyClient, final VariableRegistryUpdateRequest updateRequest,
+        final Pause pause, final Collection<AffectedComponentDTO> affectedServices, final ControllerServiceState desiredState, final VariableRegistryUpdateStep updateStep) {
+
+        final Set<String> affectedServiceIds = affectedServices.stream()
+            .map(component -> component.getComponentId())
+            .collect(Collectors.toSet());
+
+        final Map<String, Revision> serviceRevisionMap = getRevisions(groupId, affectedServiceIds);
+        final Map<String, RevisionDTO> serviceRevisionDtoMap = serviceRevisionMap.entrySet().stream().collect(
+            Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue())));
+
+        final ActivateControllerServicesEntity disableServicesEntity = new ActivateControllerServicesEntity();
+        disableServicesEntity.setComponents(serviceRevisionDtoMap);
+        disableServicesEntity.setId(groupId);
+        disableServicesEntity.setState(desiredState.name());
+
+        URI disableServicesUri;
+        try {
+            disableServicesUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
+                originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", null, originalUri.getFragment());
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+
+        final ClientResponse disableServicesResponse = jerseyClient.resource(disableServicesUri)
+            .header("Content-Type", "application/json")
+            .entity(disableServicesEntity)
+            .put(ClientResponse.class);
+
+        final int disableServicesStatus = disableServicesResponse.getStatus();
+        if (disableServicesStatus != Status.OK.getStatusCode()) {
+            updateStep.setFailureReason("Failed while " + updateStep.getDescription());
+            updateRequest.setFailureReason("Failed while " + updateStep.getDescription());
+            return;
+        }
+
+        updateRequest.setLastUpdated(new Date());
+        if (waitForControllerServiceStatus(jerseyClient, originalUri, groupId, affectedServiceIds, desiredState, pause)) {
+            updateStep.setComplete(true);
+        } else {
+            updateStep.setFailureReason("Failed while " + updateStep.getDescription());
+            updateRequest.setFailureReason("Failed while " + updateStep.getDescription());
+            return;
+        }
+    }
+
+
+    private void applyVariableRegistryUpdate(final String groupId, final URI originalUri, final Client jerseyClient, final VariableRegistryUpdateRequest updateRequest,
+        final VariableRegistryEntity updateEntity) {
+
+        URI applyUpdatesUri;
+        try {
+            applyUpdatesUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
+                originalUri.getPort(), "/nifi-api/process-groups/" + groupId + "/variable-registry", null, originalUri.getFragment());
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+
+        final ClientResponse applyUpdatesResponse = jerseyClient.resource(applyUpdatesUri)
+            .header("Content-Type", "application/json")
+            .entity(updateEntity)
+            .put(ClientResponse.class);
+
+        final int applyUpdatesStatus = applyUpdatesResponse.getStatus();
+        updateRequest.setLastUpdated(new Date());
+        if (applyUpdatesStatus != Status.OK.getStatusCode()) {
+            updateRequest.getApplyUpdatesStep().setFailureReason("Failed to apply updates to the Variable Registry");
+            updateRequest.setFailureReason("Failed to apply updates to the Variable Registry");
+            return;
+        }
+    }
+
     /**
      * Removes the specified process group reference.
      *
@@ -2426,4 +3378,8 @@ public class ProcessGroupResource extends ApplicationResource {
     public void setAuthorizer(Authorizer authorizer) {
         this.authorizer = authorizer;
     }
+
+    public void setDtoFactory(DtoFactory dtoFactory) {
+        this.dtoFactory = dtoFactory;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 4332a0c..a445e49 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -16,6 +16,33 @@
  */
 package org.apache.nifi.web.api.dto;
 
+import java.text.Collator;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
@@ -47,6 +74,7 @@ import org.apache.nifi.authorization.Resource;
 import org.apache.nifi.authorization.User;
 import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.resource.ComponentAuthorizable;
+import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.authorization.user.NiFiUserUtils;
 import org.apache.nifi.bundle.Bundle;
 import org.apache.nifi.bundle.BundleCoordinate;
@@ -110,6 +138,9 @@ import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
 import org.apache.nifi.provenance.lineage.LineageEdge;
 import org.apache.nifi.provenance.lineage.LineageNode;
 import org.apache.nifi.provenance.lineage.ProvenanceEventLineageNode;
+import org.apache.nifi.registry.ComponentVariableRegistry;
+import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
+import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.remote.RootGroupPort;
 import org.apache.nifi.reporting.Bulletin;
@@ -161,35 +192,10 @@ import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
 import org.apache.nifi.web.api.entity.ProcessorStatusSnapshotEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusSnapshotEntity;
 import org.apache.nifi.web.api.entity.TenantEntity;
+import org.apache.nifi.web.api.entity.VariableEntity;
 import org.apache.nifi.web.controller.ControllerFacade;
 import org.apache.nifi.web.revision.RevisionManager;
 
-import javax.ws.rs.WebApplicationException;
-import java.text.Collator;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
 public final class DtoFactory {
 
     @SuppressWarnings("rawtypes")
@@ -1712,9 +1718,34 @@ public final class DtoFactory {
      * @return dto
      */
     public PermissionsDTO createPermissionsDto(final Authorizable authorizable) {
+        return createPermissionsDto(authorizable, NiFiUserUtils.getNiFiUser());
+    }
+
+    /**
+     * Creates the PermissionsDTO based on the specified Authorizable for the given user
+     *
+     * @param authorizable authorizable
+     * @param user the NiFi User for which the Permissions are being created
+     * @return dto
+     */
+    public PermissionsDTO createPermissionsDto(final Authorizable authorizable, final NiFiUser user) {
         final PermissionsDTO dto = new PermissionsDTO();
-        dto.setCanRead(authorizable.isAuthorized(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()));
-        dto.setCanWrite(authorizable.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()));
+        dto.setCanRead(authorizable.isAuthorized(authorizer, RequestAction.READ, user));
+        dto.setCanWrite(authorizable.isAuthorized(authorizer, RequestAction.WRITE, user));
+        return dto;
+    }
+
+    public AffectedComponentDTO createAffectedComponentDto(final ConfiguredComponent component) {
+        final AffectedComponentDTO dto = new AffectedComponentDTO();
+        dto.setComponentId(component.getIdentifier());
+        dto.setParentGroupId(component.getProcessGroupIdentifier());
+
+        if (component instanceof ProcessorNode) {
+            dto.setComponentType(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
+        } else if (component instanceof ControllerServiceNode) {
+            dto.setComponentType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+        }
+
         return dto;
     }
 
@@ -1998,6 +2029,10 @@ public final class DtoFactory {
         dto.setComments(group.getComments());
         dto.setName(group.getName());
 
+        final Map<String, String> variables = group.getVariableRegistry().getVariableMap().entrySet().stream()
+            .collect(Collectors.toMap(entry -> entry.getKey().getName(), entry -> entry.getValue()));
+        group.setVariables(variables);
+
         final ProcessGroup parentGroup = group.getParent();
         if (parentGroup != null) {
             dto.setParentGroupId(parentGroup.getIdentifier());
@@ -2079,6 +2114,128 @@ public final class DtoFactory {
         return deprecationNotice == null ? null : deprecationNotice.reason();
     }
 
+
+    public VariableRegistryDTO createVariableRegistryDto(final ProcessGroup processGroup) {
+        final ComponentVariableRegistry variableRegistry = processGroup.getVariableRegistry();
+
+        final List<String> variableNames = variableRegistry.getVariableMap().keySet().stream()
+            .map(descriptor -> descriptor.getName())
+            .collect(Collectors.toList());
+
+        final Set<VariableEntity> variableEntities = new LinkedHashSet<>();
+
+        for (final String variableName : variableNames) {
+            final VariableDTO variableDto = new VariableDTO();
+            variableDto.setName(variableName);
+            variableDto.setValue(variableRegistry.getVariableValue(variableName));
+            variableDto.setProcessGroupId(processGroup.getIdentifier());
+
+            final Set<ConfiguredComponent> affectedComponents = processGroup.getComponentsAffectedByVariable(variableName);
+            final Set<AffectedComponentDTO> affectedComponentDtos = affectedComponents.stream()
+                .map(component -> createAffectedComponentDto(component))
+                .collect(Collectors.toSet());
+
+            boolean canWrite = true;
+            for (final ConfiguredComponent component : affectedComponents) {
+                final PermissionsDTO permissions = createPermissionsDto(component);
+                if (!permissions.getCanRead() || !permissions.getCanWrite()) {
+                    canWrite = false;
+                    break;
+                }
+            }
+
+            variableDto.setAffectedComponents(affectedComponentDtos);
+
+            final VariableEntity variableEntity = new VariableEntity();
+            variableEntity.setVariable(variableDto);
+            variableEntity.setCanWrite(canWrite);
+
+            variableEntities.add(variableEntity);
+        }
+
+        final VariableRegistryDTO registryDto = new VariableRegistryDTO();
+        registryDto.setProcessGroupId(processGroup.getIdentifier());
+        registryDto.setVariables(variableEntities);
+
+        return registryDto;
+    }
+
+    public VariableRegistryUpdateRequestDTO createVariableRegistryUpdateRequestDto(final VariableRegistryUpdateRequest request) {
+        final VariableRegistryUpdateRequestDTO dto = new VariableRegistryUpdateRequestDTO();
+        dto.setComplete(request.isComplete());
+        dto.setFailureReason(request.getFailureReason());
+        dto.setLastUpdated(request.getLastUpdated());
+        dto.setProcessGroupId(request.getProcessGroupId());
+        dto.setRequestId(request.getRequestId());
+        dto.setSubmissionTime(request.getSubmissionTime());
+
+        final List<VariableRegistryUpdateStepDTO> updateSteps = new ArrayList<>();
+        updateSteps.add(createVariableRegistryUpdateStepDto(request.getIdentifyRelevantComponentsStep()));
+        updateSteps.add(createVariableRegistryUpdateStepDto(request.getStopProcessorsStep()));
+        updateSteps.add(createVariableRegistryUpdateStepDto(request.getDisableServicesStep()));
+        updateSteps.add(createVariableRegistryUpdateStepDto(request.getApplyUpdatesStep()));
+        updateSteps.add(createVariableRegistryUpdateStepDto(request.getEnableServicesStep()));
+        updateSteps.add(createVariableRegistryUpdateStepDto(request.getStartProcessorsStep()));
+        dto.setUpdateSteps(updateSteps);
+
+        return dto;
+    }
+
+    public VariableRegistryUpdateStepDTO createVariableRegistryUpdateStepDto(final VariableRegistryUpdateStep step) {
+        final VariableRegistryUpdateStepDTO dto = new VariableRegistryUpdateStepDTO();
+        dto.setComplete(step.isComplete());
+        dto.setDescription(step.getDescription());
+        dto.setFailureReason(step.getFailureReason());
+        return dto;
+    }
+
+
+    public VariableRegistryDTO populateAffectedComponents(final VariableRegistryDTO variableRegistry, final ProcessGroup group) {
+        if (!group.getIdentifier().equals(variableRegistry.getProcessGroupId())) {
+            throw new IllegalArgumentException("Variable Registry does not have the same Group ID as the given Process Group");
+        }
+
+        final Set<VariableEntity> variableEntities = new LinkedHashSet<>();
+
+        for (final VariableEntity inputEntity : variableRegistry.getVariables()) {
+            final VariableEntity entity = new VariableEntity();
+
+            final VariableDTO inputDto = inputEntity.getVariable();
+            final VariableDTO variableDto = new VariableDTO();
+            variableDto.setName(inputDto.getName());
+            variableDto.setValue(inputDto.getValue());
+            variableDto.setProcessGroupId(group.getIdentifier());
+
+            final Set<ConfiguredComponent> affectedComponents = group.getComponentsAffectedByVariable(variableDto.getName());
+            final Set<AffectedComponentDTO> affectedComponentDtos = affectedComponents.stream()
+                .map(component -> createAffectedComponentDto(component))
+                .collect(Collectors.toSet());
+
+            boolean canWrite = true;
+            for (final ConfiguredComponent component : affectedComponents) {
+                final PermissionsDTO permissions = createPermissionsDto(component);
+                if (!permissions.getCanRead() || !permissions.getCanWrite()) {
+                    canWrite = false;
+                    break;
+                }
+            }
+
+            variableDto.setAffectedComponents(affectedComponentDtos);
+
+            entity.setCanWrite(canWrite);
+            entity.setVariable(inputDto);
+
+            variableEntities.add(entity);
+        }
+
+        final VariableRegistryDTO registryDto = new VariableRegistryDTO();
+        registryDto.setProcessGroupId(group.getIdentifier());
+        registryDto.setVariables(variableEntities);
+
+        return registryDto;
+    }
+
+
     /**
      * Gets the capability description from the specified class.
      */
@@ -3016,6 +3173,10 @@ public final class DtoFactory {
         copy.setActiveRemotePortCount(original.getActiveRemotePortCount());
         copy.setInactiveRemotePortCount(original.getInactiveRemotePortCount());
 
+        if (original.getVariables() != null) {
+            copy.setVariables(new HashMap<>(original.getVariables()));
+        }
+
         return copy;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
index 41249ba..a7f370a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
@@ -65,6 +65,7 @@ import org.apache.nifi.web.api.entity.StatusHistoryEntity;
 import org.apache.nifi.web.api.entity.TenantEntity;
 import org.apache.nifi.web.api.entity.UserEntity;
 import org.apache.nifi.web.api.entity.UserGroupEntity;
+import org.apache.nifi.web.api.entity.VariableRegistryEntity;
 
 import java.util.Date;
 import java.util.List;
@@ -431,6 +432,18 @@ public final class EntityFactory {
         return entity;
     }
 
+    public VariableRegistryEntity createVariableRegistryEntity(final VariableRegistryDTO dto, final RevisionDTO revision, final PermissionsDTO permissions) {
+        final VariableRegistryEntity entity = new VariableRegistryEntity();
+        entity.setProcessGroupRevision(revision);
+        if (dto != null) {
+            if (permissions != null && permissions.getCanRead()) {
+                entity.setVariableRegistry(dto);
+            }
+        }
+
+        return entity;
+    }
+
     public ControllerServiceEntity createControllerServiceEntity(final ControllerServiceDTO dto, final RevisionDTO revision, final PermissionsDTO permissions, final List<BulletinEntity> bulletins) {
         final ControllerServiceEntity entity = new ControllerServiceEntity();
         entity.setRevision(revision);

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java
index fa92425..0409e95 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java
@@ -63,7 +63,7 @@ public interface ControllerServiceDAO {
      *
      * @return The controller services
      */
-    Set<ControllerServiceNode> getControllerServices(String groupId);
+    Set<ControllerServiceNode> getControllerServices(String groupId, boolean includeAncestorGroups, boolean includeDescendantGroups);
 
     /**
      * Updates the specified controller service.

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
index 155b36e..d7ca806 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
@@ -16,11 +16,14 @@
  */
 package org.apache.nifi.web.dao;
 
+import java.util.Set;
+import java.util.concurrent.Future;
+
 import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.web.api.dto.ProcessGroupDTO;
-
-import java.util.Set;
+import org.apache.nifi.web.api.dto.VariableRegistryDTO;
 
 public interface ProcessGroupDAO {
 
@@ -65,12 +68,32 @@ public interface ProcessGroupDAO {
     void verifyScheduleComponents(String groupId, ScheduledState state, Set<String> componentIds);
 
     /**
+     * Verifies the specified controller services can be modified
+     *
+     * @param groupId the ID of the process group
+     * @param state the desired state
+     * @param serviceIds the ID's of the controller services
+     */
+    void verifyActivateControllerServices(String groupId, ControllerServiceState state, Set<String> serviceIds);
+
+    /**
      * Schedules the components in the specified process group.
      *
      * @param groupId id
      * @param state scheduled state
+     *
+     * @return a Future that can be used to wait for the services to finish starting or stopping
      */
-    void scheduleComponents(String groupId, ScheduledState state, Set<String> componentIds);
+    Future<Void> scheduleComponents(String groupId, ScheduledState state, Set<String> componentIds);
+
+    /**
+     * Enables or disables the controller services in the specified process group
+     *
+     * @param groupId the id of the group
+     * @param state the desired state
+     * @param serviceIds the ID's of the services to enable or disable
+     */
+    Future<Void> activateControllerServices(String groupId, ControllerServiceState state, Set<String> serviceIds);
 
     /**
      * Updates the specified process group.
@@ -81,6 +104,21 @@ public interface ProcessGroupDAO {
     ProcessGroup updateProcessGroup(ProcessGroupDTO processGroup);
 
     /**
+     * Updates the specified variable registry
+     *
+     * @param variableRegistry the Variable Registry
+     * @return the Process Group that was updated
+     */
+    ProcessGroup updateVariableRegistry(VariableRegistryDTO variableRegistry);
+
+    /**
+     * Verifies that the specified updates to a current Process Group can be applied at this time
+     *
+     * @param processGroup the DTO That describes the changes to occur
+     */
+    void verifyUpdate(ProcessGroupDTO processGroup);
+
+    /**
      * Verifies the specified process group can be removed.
      *
      * @param groupId id

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
index 36cf85b..0f9ec7a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
@@ -124,7 +124,7 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
     }
 
     @Override
-    public Set<ControllerServiceNode> getControllerServices(final String groupId) {
+    public Set<ControllerServiceNode> getControllerServices(final String groupId, final boolean includeAncestorGroups, final boolean includeDescendantGroups) {
         if (groupId == null) {
             return flowController.getRootControllerServices();
         } else {
@@ -134,7 +134,12 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
                 throw new ResourceNotFoundException("Could not find Process Group with ID " + groupId);
             }
 
-            return procGroup.getControllerServices(true);
+            final Set<ControllerServiceNode> serviceNodes = procGroup.getControllerServices(includeAncestorGroups);
+            if (includeDescendantGroups) {
+                serviceNodes.addAll(procGroup.findAllControllerServices());
+            }
+
+            return serviceNodes;
         }
     }