You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2017/08/17 14:43:20 UTC

[3/9] nifi git commit: NIFI-4224: - Initial implementation of Process Group level Variable Registry - Updated to incorporate PR Feedback - Changed log message because slf4j-simple apparently has a memory leak; passing a String instead of passing in the C

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 36a9524..35686a5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -16,7 +16,31 @@
  */
 package org.apache.nifi.web;
 
-import com.google.common.collect.Sets;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+
 import org.apache.nifi.action.Action;
 import org.apache.nifi.action.Component;
 import org.apache.nifi.action.FlowChangeAction;
@@ -84,6 +108,7 @@ import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.history.History;
 import org.apache.nifi.history.HistoryQuery;
 import org.apache.nifi.history.PreviousValue;
+import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.remote.RootGroupPort;
 import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinQuery;
@@ -92,6 +117,7 @@ import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.api.dto.AccessPolicyDTO;
 import org.apache.nifi.web.api.dto.AccessPolicySummaryDTO;
+import org.apache.nifi.web.api.dto.AffectedComponentDTO;
 import org.apache.nifi.web.api.dto.BulletinBoardDTO;
 import org.apache.nifi.web.api.dto.BulletinDTO;
 import org.apache.nifi.web.api.dto.BulletinQueryDTO;
@@ -137,6 +163,7 @@ import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
 import org.apache.nifi.web.api.dto.TemplateDTO;
 import org.apache.nifi.web.api.dto.UserDTO;
 import org.apache.nifi.web.api.dto.UserGroupDTO;
+import org.apache.nifi.web.api.dto.VariableRegistryDTO;
 import org.apache.nifi.web.api.dto.action.HistoryDTO;
 import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
 import org.apache.nifi.web.api.dto.flow.FlowDTO;
@@ -157,6 +184,7 @@ import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 import org.apache.nifi.web.api.entity.AccessPolicyEntity;
 import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity;
 import org.apache.nifi.web.api.entity.ActionEntity;
+import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
 import org.apache.nifi.web.api.entity.BulletinEntity;
 import org.apache.nifi.web.api.entity.ComponentReferenceEntity;
 import org.apache.nifi.web.api.entity.ConnectionEntity;
@@ -190,6 +218,8 @@ import org.apache.nifi.web.api.entity.TemplateEntity;
 import org.apache.nifi.web.api.entity.TenantEntity;
 import org.apache.nifi.web.api.entity.UserEntity;
 import org.apache.nifi.web.api.entity.UserGroupEntity;
+import org.apache.nifi.web.api.entity.VariableEntity;
+import org.apache.nifi.web.api.entity.VariableRegistryEntity;
 import org.apache.nifi.web.controller.ControllerFacade;
 import org.apache.nifi.web.dao.AccessPolicyDAO;
 import org.apache.nifi.web.dao.ConnectionDAO;
@@ -217,28 +247,7 @@ import org.apache.nifi.web.util.SnippetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
+import com.google.common.collect.Sets;
 
 /**
  * Implementation of NiFiServiceFacade that performs revision checking.
@@ -423,6 +432,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
+    public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Set<String> serviceIds) {
+        processGroupDAO.verifyActivateControllerServices(groupId, state, serviceIds);
+    }
+
+    @Override
     public void verifyDeleteProcessGroup(final String groupId) {
         processGroupDAO.verifyDelete(groupId);
     }
@@ -624,6 +638,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
      */
     private <D, C> RevisionUpdate<D> updateComponent(final Revision revision, final Authorizable authorizable, final Supplier<C> daoUpdate, final Function<C, D> dtoCreation) {
         final NiFiUser user = NiFiUserUtils.getNiFiUser();
+        return updateComponent(user, revision, authorizable, daoUpdate, dtoCreation);
+    }
+
+    private <D, C> RevisionUpdate<D> updateComponent(final NiFiUser user, final Revision revision, final Authorizable authorizable, final Supplier<C> daoUpdate, final Function<C, D> dtoCreation) {
         try {
             final RevisionUpdate<D> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(revision), user, new UpdateRevisionTask<D>() {
                 @Override
@@ -774,6 +792,81 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
+    public Set<AffectedComponentDTO> getComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) {
+        final ProcessGroup group = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId());
+        if (group == null) {
+            throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistryDto.getProcessGroupId());
+        }
+
+        final Map<String, String> variableMap = new HashMap<>();
+        variableRegistryDto.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null
+            .map(VariableEntity::getVariable)
+            .forEach(var -> variableMap.put(var.getName(), var.getValue()));
+
+        final Set<AffectedComponentDTO> affectedComponentDtos = new HashSet<>();
+
+        final Set<String> updatedVariableNames = getUpdatedVariables(group, variableMap);
+        for (final String variableName : updatedVariableNames) {
+            final Set<ConfiguredComponent> affectedComponents = group.getComponentsAffectedByVariable(variableName);
+
+            for (final ConfiguredComponent component : affectedComponents) {
+                if (component instanceof ProcessorNode) {
+                    final ProcessorNode procNode = (ProcessorNode) component;
+                    if (procNode.isRunning()) {
+                        affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(procNode));
+                    }
+                } else if (component instanceof ControllerServiceNode) {
+                    final ControllerServiceNode serviceNode = (ControllerServiceNode) component;
+                    if (serviceNode.isActive()) {
+                        affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(serviceNode));
+                    }
+                } else {
+                    throw new RuntimeException("Found unexpected type of Component [" + component.getCanonicalClassName() + "] dependending on variable");
+                }
+            }
+        }
+
+        return affectedComponentDtos;
+    }
+
+    private Set<String> getUpdatedVariables(final ProcessGroup group, final Map<String, String> newVariableValues) {
+        final Set<String> updatedVariableNames = new HashSet<>();
+
+        final ComponentVariableRegistry registry = group.getVariableRegistry();
+        for (final Map.Entry<String, String> entry : newVariableValues.entrySet()) {
+            final String varName = entry.getKey();
+            final String newValue = entry.getValue();
+
+            final String curValue = registry.getVariableValue(varName);
+            if (!Objects.equals(newValue, curValue)) {
+                updatedVariableNames.add(varName);
+            }
+        }
+
+        return updatedVariableNames;
+    }
+
+
+    @Override
+    public VariableRegistryEntity updateVariableRegistry(Revision revision, VariableRegistryDTO variableRegistryDto) {
+        return updateVariableRegistry(NiFiUserUtils.getNiFiUser(), revision, variableRegistryDto);
+    }
+
+    @Override
+    public VariableRegistryEntity updateVariableRegistry(NiFiUser user, Revision revision, VariableRegistryDTO variableRegistryDto) {
+        final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId());
+        final RevisionUpdate<VariableRegistryDTO> snapshot = updateComponent(user, revision,
+            processGroupNode,
+            () -> processGroupDAO.updateVariableRegistry(variableRegistryDto),
+            processGroup -> dtoFactory.createVariableRegistryDto(processGroup));
+
+        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode);
+        final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
+        return entityFactory.createVariableRegistryEntity(snapshot.getComponent(), updatedRevision, permissions);
+    }
+
+
+    @Override
     public ProcessGroupEntity updateProcessGroup(final Revision revision, final ProcessGroupDTO processGroupDTO) {
         final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(processGroupDTO.getId());
         final RevisionUpdate<ProcessGroupDTO> snapshot = updateComponent(revision,
@@ -790,14 +883,27 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
+    public void verifyUpdateProcessGroup(ProcessGroupDTO processGroupDTO) {
+        if (processGroupDAO.hasProcessGroup(processGroupDTO.getId())) {
+            processGroupDAO.verifyUpdate(processGroupDTO);
+        }
+    }
+
+    @Override
     public ScheduleComponentsEntity scheduleComponents(final String processGroupId, final ScheduledState state, final Map<String, Revision> componentRevisions) {
         final NiFiUser user = NiFiUserUtils.getNiFiUser();
+        return scheduleComponents(user, processGroupId, state, componentRevisions);
+    }
+
+    @Override
+    public ScheduleComponentsEntity scheduleComponents(final NiFiUser user, final String processGroupId, final ScheduledState state, final Map<String, Revision> componentRevisions) {
+
         final RevisionUpdate<ScheduleComponentsEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new
                 UpdateRevisionTask<ScheduleComponentsEntity>() {
                     @Override
                     public RevisionUpdate<ScheduleComponentsEntity> update() {
                         // schedule the components
-                        processGroupDAO.scheduleComponents(processGroupId, state, componentRevisions.keySet());
+                processGroupDAO.scheduleComponents(processGroupId, state, componentRevisions.keySet());
 
                         // update the revisions
                         final Map<String, Revision> updatedRevisions = new HashMap<>();
@@ -821,6 +927,46 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
+    public ActivateControllerServicesEntity activateControllerServices(final String processGroupId, final ControllerServiceState state, final Map<String, Revision> serviceRevisions) {
+
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+        return activateControllerServices(user, processGroupId, state, serviceRevisions);
+    }
+
+    @Override
+    public ActivateControllerServicesEntity activateControllerServices(final NiFiUser user, final String processGroupId, final ControllerServiceState state,
+        final Map<String, Revision> serviceRevisions) {
+
+        final RevisionUpdate<ActivateControllerServicesEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(serviceRevisions.values()), user,
+            new UpdateRevisionTask<ActivateControllerServicesEntity>() {
+                @Override
+                public RevisionUpdate<ActivateControllerServicesEntity> update() {
+                    // schedule the components
+                    processGroupDAO.activateControllerServices(processGroupId, state, serviceRevisions.keySet());
+
+                    // update the revisions
+                    final Map<String, Revision> updatedRevisions = new HashMap<>();
+                    for (final Revision revision : serviceRevisions.values()) {
+                        final Revision currentRevision = revisionManager.getRevision(revision.getComponentId());
+                        updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId()));
+                    }
+
+                    // save
+                    controllerFacade.save();
+
+                    // gather details for response
+                    final ActivateControllerServicesEntity entity = new ActivateControllerServicesEntity();
+                    entity.setId(processGroupId);
+                    entity.setState(state.name());
+                    return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values()));
+                }
+            });
+
+        return updatedComponent.getComponent();
+    }
+
+
+    @Override
     public ControllerConfigurationEntity updateControllerConfiguration(final Revision revision, final ControllerConfigurationDTO controllerConfigurationDTO) {
         final RevisionUpdate<ControllerConfigurationDTO> updatedComponent = updateComponent(
                 revision,
@@ -3062,7 +3208,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         return createProcessGroupEntity(processGroup);
     }
 
-    private ControllerServiceEntity createControllerServiceEntity(final ControllerServiceNode serviceNode, final Set<String> serviceIds) {
+    private ControllerServiceEntity createControllerServiceEntity(final ControllerServiceNode serviceNode, final Set<String> serviceIds, final NiFiUser user) {
         final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(serviceNode);
 
         final ControllerServiceReference ref = serviceNode.getReferences();
@@ -3070,26 +3216,77 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         dto.setReferencingComponents(referencingComponentsEntity.getControllerServiceReferencingComponents());
 
         final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(serviceNode.getIdentifier()));
-        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(serviceNode);
+        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(serviceNode, user);
         final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(serviceNode.getIdentifier()));
         final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
         return entityFactory.createControllerServiceEntity(dto, revision, permissions, bulletinEntities);
     }
 
     @Override
-    public Set<ControllerServiceEntity> getControllerServices(final String groupId) {
-        final Set<ControllerServiceNode> serviceNodes = controllerServiceDAO.getControllerServices(groupId);
+    public VariableRegistryEntity getVariableRegistry(final String groupId, final boolean includeAncestorGroups) {
+        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
+        if (processGroup == null) {
+            throw new ResourceNotFoundException("Could not find group with ID " + groupId);
+        }
+
+        return createVariableRegistryEntity(processGroup, includeAncestorGroups);
+    }
+
+    private VariableRegistryEntity createVariableRegistryEntity(final ProcessGroup processGroup, final boolean includeAncestorGroups) {
+        final VariableRegistryDTO registryDto = dtoFactory.createVariableRegistryDto(processGroup);
+        final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier()));
+        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
+
+        if (includeAncestorGroups) {
+            ProcessGroup parent = processGroup.getParent();
+            while (parent != null) {
+                final PermissionsDTO parentPerms = dtoFactory.createPermissionsDto(processGroup);
+                if (Boolean.TRUE.equals(parentPerms.getCanRead())) {
+                    final VariableRegistryDTO parentRegistryDto = dtoFactory.createVariableRegistryDto(parent);
+                    final Set<VariableEntity> parentVariables = parentRegistryDto.getVariables();
+                    registryDto.getVariables().addAll(parentVariables);
+                }
+
+                parent = parent.getParent();
+            }
+        }
+
+        return entityFactory.createVariableRegistryEntity(registryDto, revision, permissions);
+    }
+
+    @Override
+    public VariableRegistryEntity populateAffectedComponents(final VariableRegistryDTO variableRegistryDto) {
+        final String groupId = variableRegistryDto.getProcessGroupId();
+        final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
+        if (processGroup == null) {
+            throw new ResourceNotFoundException("Could not find group with ID " + groupId);
+        }
+
+        final VariableRegistryDTO registryDto = dtoFactory.populateAffectedComponents(variableRegistryDto, processGroup);
+        final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier()));
+        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup);
+        return entityFactory.createVariableRegistryEntity(registryDto, revision, permissions);
+    }
+
+    @Override
+    public Set<ControllerServiceEntity> getControllerServices(final String groupId, final boolean includeAncestorGroups, final boolean includeDescendantGroups) {
+        return getControllerServices(groupId, includeAncestorGroups, includeDescendantGroups, NiFiUserUtils.getNiFiUser());
+    }
+
+    @Override
+    public Set<ControllerServiceEntity> getControllerServices(final String groupId, final boolean includeAncestorGroups, final boolean includeDescendantGroups, final NiFiUser user) {
+        final Set<ControllerServiceNode> serviceNodes = controllerServiceDAO.getControllerServices(groupId, includeAncestorGroups, includeDescendantGroups);
         final Set<String> serviceIds = serviceNodes.stream().map(service -> service.getIdentifier()).collect(Collectors.toSet());
 
         return serviceNodes.stream()
-            .map(serviceNode -> createControllerServiceEntity(serviceNode, serviceIds))
+            .map(serviceNode -> createControllerServiceEntity(serviceNode, serviceIds, user))
             .collect(Collectors.toSet());
     }
 
     @Override
     public ControllerServiceEntity getControllerService(final String controllerServiceId) {
         final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId);
-        return createControllerServiceEntity(controllerService, Sets.newHashSet(controllerServiceId));
+        return createControllerServiceEntity(controllerService, Sets.newHashSet(controllerServiceId), NiFiUserUtils.getNiFiUser());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index 3d78741..1a50d04 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -608,6 +608,11 @@ public abstract class ApplicationResource {
             serviceFacade.authorizeAccess(authorizer);
             serviceFacade.verifyRevision(revision, user);
 
+            // verify if necessary
+            if (verifier != null) {
+                verifier.run();
+            }
+
             return action.apply(revision, entity);
         }
     }
@@ -657,6 +662,11 @@ public abstract class ApplicationResource {
             serviceFacade.authorizeAccess(authorizer);
             serviceFacade.verifyRevisions(revisions, user);
 
+            // verify if necessary
+            if (verifier != null) {
+                verifier.run();
+            }
+
             return action.apply(revisions, entity);
         }
     }
@@ -820,16 +830,16 @@ public abstract class ApplicationResource {
         }
     }
 
-    /**
-     * Replicates the request to the given node
-     *
-     * @param method   the HTTP method
-     * @param nodeUuid the UUID of the node to replicate the request to
-     * @return the response from the node
-     * @throws UnknownNodeException if the nodeUuid given does not map to any node in the cluster
-     */
-    protected Response replicate(final String method, final String nodeUuid) {
-        return replicate(method, getRequestParameters(), nodeUuid);
+
+    private void ensureFlowInitialized() {
+        if (!flowController.isInitialized()) {
+            throw new IllegalClusterStateException("Cluster is still in the process of voting on the appropriate Data Flow.");
+        }
+    }
+
+    protected Response replicate(final String method, final Object entity, final String nodeUuid, final Map<String, String> headersToOverride) {
+        final URI path = getAbsolutePath();
+        return replicate(path, method, entity, nodeUuid, headersToOverride);
     }
 
     /**
@@ -845,22 +855,16 @@ public abstract class ApplicationResource {
         return replicate(method, entity, nodeUuid, null);
     }
 
-    private void ensureFlowInitialized() {
-        if (!flowController.isInitialized()) {
-            throw new IllegalClusterStateException("Cluster is still in the process of voting on the appropriate Data Flow.");
-        }
-    }
-
     /**
      * Replicates the request to the given node
      *
-     * @param method   the HTTP method
-     * @param entity   the Entity to replicate
+     * @param method the HTTP method
+     * @param entity the Entity to replicate
      * @param nodeUuid the UUID of the node to replicate the request to
      * @return the response from the node
      * @throws UnknownNodeException if the nodeUuid given does not map to any node in the cluster
      */
-    protected Response replicate(final String method, final Object entity, final String nodeUuid, final Map<String, String> headersToOverride) {
+    protected Response replicate(final URI path, final String method, final Object entity, final String nodeUuid, final Map<String, String> headersToOverride) {
         // since we're cluster we must specify the cluster node identifier
         if (nodeUuid == null) {
             throw new IllegalArgumentException("The cluster node identifier must be specified.");
@@ -873,7 +877,6 @@ public abstract class ApplicationResource {
 
         ensureFlowInitialized();
 
-        final URI path = getAbsolutePath();
         try {
             final Map<String, String> headers = headersToOverride == null ? getHeaders() : getHeaders(headersToOverride);
 
@@ -996,6 +999,12 @@ public abstract class ApplicationResource {
         }
     }
 
+
+    protected NodeResponse replicateNodeResponse(final String method, final Object entity, final Map<String, String> headersToOverride) throws InterruptedException {
+        final URI path = getAbsolutePath();
+        return replicateNodeResponse(path, method, entity, headersToOverride);
+    }
+
     /**
      * Replicates the request to all nodes in the cluster using the provided method and entity. The headers
      * used will be those provided by the {@link #getHeaders()} method. The URI that will be used will be
@@ -1009,10 +1018,9 @@ public abstract class ApplicationResource {
      * @throws InterruptedException if interrupted while replicating the request
      * @see #replicate(String, Object, Map)
      */
-    protected NodeResponse replicateNodeResponse(final String method, final Object entity, final Map<String, String> headersToOverride) throws InterruptedException {
+    protected NodeResponse replicateNodeResponse(final URI path, final String method, final Object entity, final Map<String, String> headersToOverride) throws InterruptedException {
         ensureFlowInitialized();
 
-        final URI path = getAbsolutePath();
         final Map<String, String> headers = headersToOverride == null ? getHeaders() : getHeaders(headersToOverride);
 
         // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index 889676c..5d5a796 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -36,6 +36,8 @@ import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.nar.NarClassLoaders;
 import org.apache.nifi.util.NiFiProperties;
@@ -61,6 +63,7 @@ import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
 import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
 import org.apache.nifi.web.api.entity.AboutEntity;
 import org.apache.nifi.web.api.entity.ActionEntity;
+import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
 import org.apache.nifi.web.api.entity.BannerEntity;
 import org.apache.nifi.web.api.entity.BulletinBoardEntity;
 import org.apache.nifi.web.api.entity.ClusteSummaryEntity;
@@ -118,6 +121,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 /**
@@ -388,7 +392,7 @@ public class FlowResource extends ApplicationResource {
         }
 
         // get all the controller services
-        final Set<ControllerServiceEntity> controllerServices = serviceFacade.getControllerServices(null);
+        final Set<ControllerServiceEntity> controllerServices = serviceFacade.getControllerServices(null, false, false);
         controllerServiceResource.populateRemainingControllerServiceEntitiesContent(controllerServices);
 
         // create the response entity
@@ -426,11 +430,10 @@ public class FlowResource extends ApplicationResource {
             }
     )
     public Response getControllerServicesFromGroup(
-            @ApiParam(
-                    value = "The process group id.",
-                    required = true
-            )
-            @PathParam("id") String groupId) throws InterruptedException {
+            @ApiParam(value = "The process group id.", required = true) @PathParam("id") String groupId,
+            @ApiParam("Whether or not to include parent/ancestory process groups") @QueryParam("includeAncestorGroups") @DefaultValue("true") boolean includeAncestorGroups,
+            @ApiParam("Whether or not to include descendant process groups") @QueryParam("includeDescendantGroups") @DefaultValue("false") boolean includeDescendantGroups
+            ) throws InterruptedException {
 
         authorizeFlow();
 
@@ -439,7 +442,7 @@ public class FlowResource extends ApplicationResource {
         }
 
         // get all the controller services
-        final Set<ControllerServiceEntity> controllerServices = serviceFacade.getControllerServices(groupId);
+        final Set<ControllerServiceEntity> controllerServices = serviceFacade.getControllerServices(groupId, includeAncestorGroups, includeDescendantGroups);
         controllerServiceResource.populateRemainingControllerServiceEntitiesContent(controllerServices);
 
         // create the response entity
@@ -512,7 +515,7 @@ public class FlowResource extends ApplicationResource {
     @Produces(MediaType.APPLICATION_JSON)
     @Path("process-groups/{id}")
     @ApiOperation(
-            value = "Schedule or unschedule comopnents in the specified Process Group.",
+            value = "Schedule or unschedule components in the specified Process Group.",
             response = ScheduleComponentsEntity.class,
             authorizations = {
                     @Authorization(value = "Read - /flow", type = ""),
@@ -570,7 +573,7 @@ public class FlowResource extends ApplicationResource {
 
                 // ensure authorized for each processor we will attempt to schedule
                 group.findAllProcessors().stream()
-                        .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PROCESSORS : ProcessGroup.UNSCHEDULABLE_PROCESSORS)
+                    .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PROCESSORS : ProcessGroup.UNSCHEDULABLE_PROCESSORS)
                         .filter(processor -> processor.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()))
                         .forEach(processor -> {
                             componentIds.add(processor.getIdentifier());
@@ -578,7 +581,7 @@ public class FlowResource extends ApplicationResource {
 
                 // ensure authorized for each input port we will attempt to schedule
                 group.findAllInputPorts().stream()
-                        .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
+                    .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
                         .filter(inputPort -> inputPort.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()))
                         .forEach(inputPort -> {
                             componentIds.add(inputPort.getIdentifier());
@@ -586,7 +589,7 @@ public class FlowResource extends ApplicationResource {
 
                 // ensure authorized for each output port we will attempt to schedule
                 group.findAllOutputPorts().stream()
-                        .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
+                    .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS)
                         .filter(outputPort -> outputPort.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()))
                         .forEach(outputPort -> {
                             componentIds.add(outputPort.getIdentifier());
@@ -640,7 +643,129 @@ public class FlowResource extends ApplicationResource {
                             componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey())));
 
                     // update the process group
-                    final ScheduleComponentsEntity entity = serviceFacade.scheduleComponents(id, scheduledState, componentRevisions);
+                final ScheduleComponentsEntity entity = serviceFacade.scheduleComponents(id, scheduledState, componentRevisions);
+                    return generateOkResponse(entity).build();
+                }
+        );
+    }
+
+
+    @PUT
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("process-groups/{id}/controller-services")
+    @ApiOperation(value = "Enable or disable Controller Services in the specified Process Group.",
+        response = ActivateControllerServicesEntity.class,
+        authorizations = {
+            @Authorization(value = "Read - /flow", type = ""),
+            @Authorization(value = "Write - /{component-type}/{uuid} - For every service being enabled/disabled", type = "")
+        })
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+            }
+    )
+    public Response activateControllerServices(
+            @Context HttpServletRequest httpServletRequest,
+            @ApiParam(value = "The process group id.", required = true)
+            @PathParam("id") String id,
+            @ApiParam(value = "The request to schedule or unschedule. If the comopnents in the request are not specified, all authorized components will be considered.", required = true)
+            final ActivateControllerServicesEntity requestEntity) {
+
+        // ensure the same id is being used
+        if (!id.equals(requestEntity.getId())) {
+            throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does "
+                + "not equal the process group id of the requested resource (%s).", requestEntity.getId(), id));
+        }
+
+        final ControllerServiceState state;
+        if (requestEntity.getState() == null) {
+            throw new IllegalArgumentException("The controller service state must be specified.");
+        } else {
+            try {
+                state = ControllerServiceState.valueOf(requestEntity.getState());
+            } catch (final IllegalArgumentException iae) {
+                throw new IllegalArgumentException(String.format("The controller service state must be one of [%s].",
+                    StringUtils.join(EnumSet.of(ControllerServiceState.ENABLED, ControllerServiceState.DISABLED), ", ")));
+            }
+        }
+
+        // ensure its a supported scheduled state
+        if (ControllerServiceState.DISABLING.equals(state) || ControllerServiceState.ENABLING.equals(state)) {
+            throw new IllegalArgumentException(String.format("The scheduled must be one of [%s].",
+                StringUtils.join(EnumSet.of(ControllerServiceState.ENABLED, ControllerServiceState.DISABLED), ", ")));
+        }
+
+        // if the components are not specified, gather all components and their current revision
+        if (requestEntity.getComponents() == null) {
+            // get the current revisions for the components being updated
+            final Set<Revision> revisions = serviceFacade.getRevisionsFromGroup(id, group -> {
+                final Set<String> componentIds = new HashSet<>();
+
+                final Predicate<ControllerServiceNode> filter;
+                if (ControllerServiceState.ENABLED.equals(state)) {
+                    filter = service -> !service.isActive() && service.isValid();
+                } else {
+                    filter = service -> service.isActive();
+                }
+
+                group.findAllControllerServices().stream()
+                    .filter(filter)
+                    .filter(service -> service.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()))
+                    .forEach(service -> componentIds.add(service.getIdentifier()));
+                return componentIds;
+            });
+
+            // build the component mapping
+            final Map<String, RevisionDTO> componentsToSchedule = new HashMap<>();
+            revisions.forEach(revision -> {
+                final RevisionDTO dto = new RevisionDTO();
+                dto.setClientId(revision.getClientId());
+                dto.setVersion(revision.getVersion());
+                componentsToSchedule.put(revision.getComponentId(), dto);
+            });
+
+            // set the components and their current revision
+            requestEntity.setComponents(componentsToSchedule);
+        }
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.PUT, requestEntity);
+        }
+
+        final Map<String, RevisionDTO> requestComponentsToSchedule = requestEntity.getComponents();
+        final Map<String, Revision> requestComponentRevisions =
+                requestComponentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey())));
+        final Set<Revision> requestRevisions = new HashSet<>(requestComponentRevisions.values());
+
+        return withWriteLock(
+                serviceFacade,
+                requestEntity,
+                requestRevisions,
+                lookup -> {
+                    // ensure access to the flow
+                    authorizeFlow();
+
+                    // ensure access to every component being scheduled
+                    requestComponentsToSchedule.keySet().forEach(componentId -> {
+                        final Authorizable authorizable = lookup.getControllerService(componentId).getAuthorizable();
+                        authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+                    });
+                },
+                () -> serviceFacade.verifyActivateControllerServices(id, state, requestComponentRevisions.keySet()),
+                (revisions, scheduleComponentsEntity) -> {
+                final ControllerServiceState serviceState = ControllerServiceState.valueOf(scheduleComponentsEntity.getState());
+
+                    final Map<String, RevisionDTO> componentsToSchedule = scheduleComponentsEntity.getComponents();
+                    final Map<String, Revision> componentRevisions =
+                            componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey())));
+
+                    // update the controller services
+                final ActivateControllerServicesEntity entity = serviceFacade.activateControllerServices(id, serviceState, componentRevisions);
                     return generateOkResponse(entity).build();
                 }
         );