You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2017/08/17 14:43:19 UTC
[2/9] nifi git commit: NIFI-4224: - Initial implementation of Process
Group level Variable Registry - Updated to incorporate PR Feedback - Changed
log message because slf4j-simple apparently has a memory leak;
passing a String instead of passing in the C
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index a9167ae..0b634a1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -16,14 +16,54 @@
*/
package org.apache.nifi.web.api;
-import com.sun.jersey.api.core.ResourceContext;
-import com.sun.jersey.multipart.FormDataParam;
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriInfo;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AuthorizableLookup;
import org.apache.nifi.authorization.AuthorizeControllerServiceReference;
@@ -38,26 +78,41 @@ import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.serialization.FlowEncodingVersion;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.framework.security.util.SslContextFactory;
+import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
+import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.util.BundleUtils;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.Revision;
+import org.apache.nifi.web.api.dto.AffectedComponentDTO;
import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.PositionDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
+import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.TemplateDTO;
+import org.apache.nifi.web.api.dto.VariableRegistryDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
+import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ConnectionsEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
import org.apache.nifi.web.api.entity.CreateTemplateRequestEntity;
import org.apache.nifi.web.api.entity.FlowEntity;
@@ -70,47 +125,36 @@ import org.apache.nifi.web.api.entity.LabelsEntity;
import org.apache.nifi.web.api.entity.OutputPortsEntity;
import org.apache.nifi.web.api.entity.PortEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.ProcessGroupsEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorsEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
+import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
import org.apache.nifi.web.api.entity.TemplateEntity;
+import org.apache.nifi.web.api.entity.VariableRegistryEntity;
+import org.apache.nifi.web.api.entity.VariableRegistryUpdateRequestEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.LongParameter;
+import org.apache.nifi.web.util.Pause;
+import org.apache.nifi.web.util.WebUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriBuilder;
-import javax.ws.rs.core.UriInfo;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.transform.stream.StreamSource;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.api.core.ResourceContext;
+import com.sun.jersey.multipart.FormDataParam;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
/**
* RESTful endpoint for managing a Group.
@@ -139,6 +183,22 @@ public class ProcessGroupResource extends ApplicationResource {
private ConnectionResource connectionResource;
private TemplateResource templateResource;
private ControllerServiceResource controllerServiceResource;
+ private DtoFactory dtoFactory;
+
+ private final ConcurrentMap<String, VariableRegistryUpdateRequest> varRegistryUpdateRequests = new ConcurrentHashMap<>();
+ private static final int MAX_VARIABLE_REGISTRY_UPDATE_REQUESTS = 100;
+ private static final long VARIABLE_REGISTRY_UPDATE_REQUEST_EXPIRATION = TimeUnit.MINUTES.toMillis(1L);
+ private final ExecutorService variableRegistryThreadPool = new ThreadPoolExecutor(1, 50, 5L, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<Runnable>(MAX_VARIABLE_REGISTRY_UPDATE_REQUESTS),
+ new ThreadFactory() {
+ @Override
+ public Thread newThread(final Runnable r) {
+ final Thread thread = Executors.defaultThreadFactory().newThread(r);
+ thread.setName("Variable Registry Update Thread");
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
/**
* Populates the remaining fields in the specified process groups.
@@ -164,6 +224,7 @@ public class ProcessGroupResource extends ApplicationResource {
return processGroupEntity;
}
+
/**
* Populates the remaining content of the specified snippet.
*/
@@ -238,6 +299,49 @@ public class ProcessGroupResource extends ApplicationResource {
return generateOkResponse(entity).build();
}
+
+ /**
+ * Retrieves the Variable Registry for the group with the given ID
+ *
+ * @param groupId the ID of the Process Group
+ * @return the Variable Registry for the group
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}/variable-registry")
+ @ApiOperation(value = "Gets a process group's variable registry",
+ response = VariableRegistryEntity.class,
+ notes = NON_GUARANTEED_ENDPOINT,
+ authorizations = {
+ @Authorization(value = "Read - /process-groups/{uuid}", type = "")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ })
+ public Response getVariableRegistry(
+ @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId,
+ @ApiParam(value = "Whether or not to include ancestor groups", required = false) @QueryParam("includeAncestorGroups") @DefaultValue("true") final boolean includeAncestorGroups) {
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.GET);
+ }
+
+ // authorize access
+ serviceFacade.authorizeAccess(lookup -> {
+ final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
+ processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+ });
+
+ // get this process group's variable registry
+ final VariableRegistryEntity entity = serviceFacade.getVariableRegistry(groupId, includeAncestorGroups);
+ return generateOkResponse(entity).build();
+ }
+
/**
* Updates the specified process group.
*
@@ -314,7 +418,7 @@ public class ProcessGroupResource extends ApplicationResource {
Authorizable authorizable = lookup.getProcessGroup(id).getAuthorizable();
authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
- null,
+ () -> serviceFacade.verifyUpdateProcessGroup(requestProcessGroupDTO),
(revision, processGroupEntity) -> {
// update the process group
final ProcessGroupEntity entity = serviceFacade.updateProcessGroup(revision, processGroupEntity.getComponent());
@@ -325,6 +429,854 @@ public class ProcessGroupResource extends ApplicationResource {
);
}
+
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{groupId}/variable-registry/update-requests/{updateId}")
+ @ApiOperation(value = "Gets a process group's variable registry",
+ response = VariableRegistryUpdateRequestEntity.class,
+ notes = NON_GUARANTEED_ENDPOINT,
+ authorizations = {
+ @Authorization(value = "Read - /process-groups/{uuid}", type = "")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ })
+ public Response getVariableRegistryUpdateRequest(
+ @ApiParam(value = "The process group id.", required = true) @PathParam("groupId") final String groupId,
+ @ApiParam(value = "The ID of the Variable Registry Update Request", required = true) @PathParam("updateId") final String updateId) {
+
+ if (groupId == null || updateId == null) {
+ throw new IllegalArgumentException("Group ID and Update ID must both be specified.");
+ }
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.GET);
+ }
+
+ // authorize access
+ serviceFacade.authorizeAccess(lookup -> {
+ final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
+ processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+ });
+
+ final VariableRegistryUpdateRequest request = varRegistryUpdateRequests.get(updateId);
+ if (request == null) {
+ throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId);
+ }
+
+ if (!groupId.equals(request.getProcessGroupId())) {
+ throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId + " for Process Group with identifier " + groupId);
+ }
+
+ final VariableRegistryUpdateRequestEntity entity = new VariableRegistryUpdateRequestEntity();
+ entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request));
+ entity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId));
+ return generateOkResponse(entity).build();
+ }
+
+
+ @DELETE
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{groupId}/variable-registry/update-requests/{updateId}")
+ @ApiOperation(value = "Deletes an update request for a process group's variable registry. If the request is not yet complete, it will automatically be cancelled.",
+ response = VariableRegistryUpdateRequestEntity.class,
+ notes = NON_GUARANTEED_ENDPOINT,
+ authorizations = {
+ @Authorization(value = "Read - /process-groups/{uuid}", type = "")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ })
+ public Response deleteVariableRegistryUpdateRequest(
+ @ApiParam(value = "The process group id.", required = true) @PathParam("groupId") final String groupId,
+ @ApiParam(value = "The ID of the Variable Registry Update Request", required = true) @PathParam("updateId") final String updateId) {
+
+ if (groupId == null || updateId == null) {
+ throw new IllegalArgumentException("Group ID and Update ID must both be specified.");
+ }
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.DELETE);
+ }
+
+ // authorize access
+ serviceFacade.authorizeAccess(lookup -> {
+ final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
+ processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+ processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+ });
+
+ final VariableRegistryUpdateRequest request = varRegistryUpdateRequests.remove(updateId);
+ if (request == null) {
+ throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId);
+ }
+
+ if (!groupId.equals(request.getProcessGroupId())) {
+ throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId + " for Process Group with identifier " + groupId);
+ }
+
+ request.cancel();
+
+ final VariableRegistryUpdateRequestEntity entity = new VariableRegistryUpdateRequestEntity();
+ entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request));
+ entity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId));
+ return generateOkResponse(entity).build();
+ }
+
+
+ @PUT
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}/variable-registry")
+ @ApiOperation(value = "Updates the contents of a Process Group's variable Registry", response = VariableRegistryEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = {
+ @Authorization(value = "Write - /process-groups/{uuid}", type = "")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ })
+ public Response updateVariableRegistry(
+ @Context final HttpServletRequest httpServletRequest,
+ @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId,
+ @ApiParam(value = "The process group configuration details.", required = true) final VariableRegistryEntity requestEntity) {
+
+ if (requestEntity == null || requestEntity.getVariableRegistry() == null) {
+ throw new IllegalArgumentException("Variable Registry details must be specified.");
+ }
+
+ if (requestEntity.getProcessGroupRevision() == null) {
+ throw new IllegalArgumentException("Process Group Revision must be specified.");
+ }
+
+ // ensure the same id is being used
+ final VariableRegistryDTO registryDto = requestEntity.getVariableRegistry();
+ if (!groupId.equals(registryDto.getProcessGroupId())) {
+ throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does "
+ + "not equal the process group id of the requested resource (%s).", registryDto.getProcessGroupId(), groupId));
+ }
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.PUT, requestEntity);
+ }
+
+ // handle expects request (usually from the cluster manager)
+ final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId);
+ return withWriteLock(
+ serviceFacade,
+ requestEntity,
+ requestRevision,
+ lookup -> {
+ Authorizable authorizable = lookup.getProcessGroup(groupId).getAuthorizable();
+ authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+ },
+ null,
+ (revision, processGroupEntity) -> {
+ // update the process group
+ final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(revision, registryDto);
+ return generateOkResponse(entity).build();
+ });
+ }
+
+
+ /**
+ * Updates the variable registry for the specified process group.
+ *
+ * @param httpServletRequest request
+ * @param groupId The id of the process group.
+ * @param requestEntity the Variable Registry Entity
+ * @return A Variable Registry Entry.
+ */
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{id}/variable-registry/update-requests")
+ @ApiOperation(value = "Submits a request to update a process group's variable registry",
+ response = VariableRegistryUpdateRequestEntity.class,
+ notes = NON_GUARANTEED_ENDPOINT,
+ authorizations = {
+ @Authorization(value = "Write - /process-groups/{uuid}", type = "")
+ })
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ })
+ public Response submitUpdateVariableRegistryRequest(
+ @Context final HttpServletRequest httpServletRequest,
+ @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId,
+ @ApiParam(value = "The process group configuration details.", required = true) final VariableRegistryEntity requestEntity) {
+
+ if (requestEntity == null || requestEntity.getVariableRegistry() == null) {
+ throw new IllegalArgumentException("Variable Registry details must be specified.");
+ }
+
+ if (requestEntity.getProcessGroupRevision() == null) {
+ throw new IllegalArgumentException("Process Group Revision must be specified.");
+ }
+
+ // In order to update variables in a variable registry, we have to perform the following steps:
+ // 1. Determine Affected Components (this includes any Processors and Controller Services and any components that reference an affected Controller Service).
+ // 1a. Determine ID's of components
+ // 1b. Determine Revision's of associated components
+ // 2. Stop All Affected Processors
+ // 3. Disable All Affected Controller Services
+ // 4. Update the Variables
+ // 5. Re-Enable all Affected Controller Services (services only, not dependent components)
+ // 6. Re-Enable all Processors that Depended on the Controller Services
+
+ // Determine the affected components (and their associated revisions)
+ final VariableRegistryEntity computedEntity = serviceFacade.populateAffectedComponents(requestEntity.getVariableRegistry());
+ final VariableRegistryDTO computedRegistryDto = computedEntity.getVariableRegistry();
+ if (computedRegistryDto == null) {
+ throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
+ }
+
+ final Set<AffectedComponentDTO> affectedComponents = serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry());
+
+ final Map<String, List<AffectedComponentDTO>> affectedComponentsByType = affectedComponents.stream()
+ .collect(Collectors.groupingBy(comp -> comp.getComponentType()));
+
+ final List<AffectedComponentDTO> affectedProcessors = affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
+ final List<AffectedComponentDTO> affectedServices = affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+
+
+ if (isReplicateRequest()) {
+ // update the variable registry
+ final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId);
+ updateRequest.getIdentifyRelevantComponentsStep().setComplete(true);
+ final URI originalUri = getAbsolutePath();
+
+ // Submit the task to be run in the background
+ final Runnable taskWrapper = () -> {
+ try {
+ updateVariableRegistryReplicated(groupId, originalUri, affectedProcessors, affectedServices, updateRequest, requestEntity);
+ } catch (final Exception e) {
+ logger.error("Failed to update variable registry", e);
+ updateRequest.setFailureReason("An unexpected error has occurred: " + e);
+ }
+ };
+
+ variableRegistryThreadPool.submit(taskWrapper);
+
+ final VariableRegistryUpdateRequestEntity responseEntity = new VariableRegistryUpdateRequestEntity();
+ responseEntity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest));
+ responseEntity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId()));
+
+ final URI location = URI.create(responseEntity.getRequestDto().getUri());
+ return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build();
+ }
+
+
+ final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId);
+ return withWriteLock(
+ serviceFacade,
+ requestEntity,
+ requestRevision,
+ lookup -> {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ final Authorizable groupAuthorizable = lookup.getProcessGroup(groupId).getAuthorizable();
+ groupAuthorizable.authorize(authorizer, RequestAction.WRITE, user);
+
+ // For every component that is affected, the user must have READ permissions and WRITE permissions
+ // (because this action requires stopping the component).
+ if (affectedProcessors != null) {
+ for (final AffectedComponentDTO affectedComponent : affectedProcessors) {
+ final Authorizable authorizable = lookup.getProcessor(affectedComponent.getComponentId()).getAuthorizable();
+ authorizable.authorize(authorizer, RequestAction.READ, user);
+ authorizable.authorize(authorizer, RequestAction.WRITE, user);
+ }
+ }
+
+ if (affectedServices != null) {
+ for (final AffectedComponentDTO affectedComponent : affectedServices) {
+ final Authorizable authorizable = lookup.getControllerService(affectedComponent.getComponentId()).getAuthorizable();
+ authorizable.authorize(authorizer, RequestAction.READ, user);
+ authorizable.authorize(authorizer, RequestAction.WRITE, user);
+ }
+ }
+ },
+ null,
+ (revision, varRegistryEntity) -> {
+ return updateVariableRegistryLocal(groupId, affectedProcessors, affectedServices, requestEntity);
+ });
+ }
+
+ private Pause createPause(final VariableRegistryUpdateRequest updateRequest) {
+ return new Pause() {
+ @Override
+ public boolean pause() {
+ if (updateRequest.isComplete()) {
+ return false;
+ }
+
+ try {
+ Thread.sleep(500);
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+
+ return !updateRequest.isComplete();
+ }
+ };
+ }
+
+ private void updateVariableRegistryReplicated(final String groupId, final URI originalUri, final Collection<AffectedComponentDTO> affectedProcessors,
+ final Collection<AffectedComponentDTO> affectedServices,
+ final VariableRegistryUpdateRequest updateRequest, final VariableRegistryEntity requestEntity) {
+
+ final NiFiProperties properties = getProperties();
+ final Client jerseyClient = WebUtils.createClient(new DefaultClientConfig(), SslContextFactory.createSslContext(properties));
+ final int connectionTimeout = (int) FormatUtils.getTimeDuration(properties.getClusterNodeConnectionTimeout(), TimeUnit.MILLISECONDS);
+ final int readTimeout = (int) FormatUtils.getTimeDuration(properties.getClusterNodeReadTimeout(), TimeUnit.MILLISECONDS);
+ jerseyClient.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT, connectionTimeout);
+ jerseyClient.getProperties().put(ClientConfig.PROPERTY_READ_TIMEOUT, readTimeout);
+ jerseyClient.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS, Boolean.TRUE);
+
+ final Pause pause = createPause(updateRequest);
+
+ // stop processors
+ if (affectedProcessors != null) {
+ logger.info("In order to update Variable Registry for Process Group with ID {}, "
+ + "replicating request to stop {} affected processors", groupId, affectedProcessors.size());
+
+ scheduleProcessors(groupId, originalUri, jerseyClient, updateRequest, pause,
+ affectedProcessors, ScheduledState.STOPPED, updateRequest.getStopProcessorsStep());
+ }
+
+ // disable controller services
+ if (affectedServices != null) {
+ logger.info("In order to update Variable Registry for Process Group with ID {}, "
+ + "replicating request to stop {} affected Controller Services", groupId, affectedServices.size());
+
+ activateControllerServices(groupId, originalUri, jerseyClient, updateRequest, pause,
+ affectedServices, ControllerServiceState.DISABLED, updateRequest.getDisableServicesStep());
+ }
+
+ // apply updates
+ logger.info("In order to update Variable Registry for Process Group with ID {}, "
+ + "replicating request to apply updates to variable registry", groupId);
+ applyVariableRegistryUpdate(groupId, originalUri, jerseyClient, updateRequest, requestEntity);
+
+ // re-enable controller services
+ if (affectedServices != null) {
+ logger.info("In order to update Variable Registry for Process Group with ID {}, "
+ + "replicating request to re-enable {} affected services", groupId, affectedServices.size());
+
+ activateControllerServices(groupId, originalUri, jerseyClient, updateRequest, pause,
+ affectedServices, ControllerServiceState.ENABLED, updateRequest.getEnableServicesStep());
+ }
+
+ // restart processors
+ if (affectedProcessors != null) {
+ logger.info("In order to update Variable Registry for Process Group with ID {}, "
+ + "replicating request to restart {} affected processors", groupId, affectedProcessors.size());
+
+ scheduleProcessors(groupId, originalUri, jerseyClient, updateRequest, pause,
+ affectedProcessors, ScheduledState.RUNNING, updateRequest.getStartProcessorsStep());
+ }
+
+ updateRequest.setComplete(true);
+ }
+
+ /**
+ * Periodically polls the process group with the given ID, waiting for all processors whose ID's are given to have the given Scheduled State.
+ *
+ * @param client the Jersey Client to use for making the request
+ * @param groupId the ID of the Process Group to poll
+ * @param processorIds the ID of all Processors whose state should be equal to the given desired state
+ * @param desiredState the desired state for all processors with the ID's given
+ * @param pause the Pause that can be used to wait between polling
+ * @return <code>true</code> if successful, <code>false</code> if unable to wait for processors to reach the desired state
+ */
+ private boolean waitForProcessorStatus(final Client client, final URI originalUri, final String groupId, final Set<String> processorIds, final ScheduledState desiredState, final Pause pause) {
+ URI groupUri;
+ try {
+ groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
+ originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/status", "recursive=true", originalUri.getFragment());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+
+ boolean continuePolling = true;
+ while (continuePolling) {
+ final ClientResponse response = client.resource(groupUri).header("Content-Type", "application/json").get(ClientResponse.class);
+ if (response.getStatus() != Status.OK.getStatusCode()) {
+ return false;
+ }
+
+ final ProcessGroupStatusEntity statusEntity = response.getEntity(ProcessGroupStatusEntity.class);
+ final ProcessGroupStatusDTO statusDto = statusEntity.getProcessGroupStatus();
+ final ProcessGroupStatusSnapshotDTO statusSnapshotDto = statusDto.getAggregateSnapshot();
+
+ if (isProcessorStatusEqual(statusSnapshotDto, processorIds, desiredState)) {
+ logger.debug("All {} processors of interest now have the desired state of {}", processorIds.size(), desiredState);
+ return true;
+ }
+
+ // Not all of the processors are in the desired state. Pause for a bit and poll again.
+ continuePolling = pause.pause();
+ }
+
+ return false;
+ }
+
+ /**
+ * Periodically polls the process group with the given ID, waiting for all processors whose ID's are given to have the given Scheduled State.
+ *
+ * @param groupId the ID of the Process Group to poll
+ * @param processorIds the ID of all Processors whose state should be equal to the given desired state
+ * @param desiredState the desired state for all processors with the ID's given
+ * @param pause the Pause that can be used to wait between polling
+ * @return <code>true</code> if successful, <code>false</code> if unable to wait for processors to reach the desired state
+ */
+ private boolean waitForLocalProcessorStatus(final String groupId, final Set<String> processorIds, final ScheduledState desiredState, final Pause pause) {
+ boolean continuePolling = true;
+ while (continuePolling) {
+ final ProcessGroupStatusEntity statusEntity = serviceFacade.getProcessGroupStatus(groupId, true);
+ final ProcessGroupStatusDTO statusDto = statusEntity.getProcessGroupStatus();
+ final ProcessGroupStatusSnapshotDTO statusSnapshotDto = statusDto.getAggregateSnapshot();
+
+ if (isProcessorStatusEqual(statusSnapshotDto, processorIds, desiredState)) {
+ logger.debug("All {} processors of interest now have the desired state of {}", processorIds.size(), desiredState);
+ return true;
+ }
+
+ // Not all of the processors are in the desired state. Pause for a bit and poll again.
+ continuePolling = pause.pause();
+ }
+
+ return false;
+ }
+
+ private boolean isProcessorStatusEqual(final ProcessGroupStatusSnapshotDTO statusSnapshot, final Set<String> processorIds, final ScheduledState desiredState) {
+ final String desiredStateName = desiredState.name();
+
+ final boolean allProcessorsMatch = statusSnapshot.getProcessorStatusSnapshots().stream()
+ .map(entity -> entity.getProcessorStatusSnapshot())
+ .filter(status -> processorIds.contains(status.getId()))
+ .allMatch(status -> {
+ final String runStatus = status.getRunStatus();
+ final boolean stateMatches = desiredStateName.equalsIgnoreCase(runStatus);
+ if (!stateMatches) {
+ return false;
+ }
+
+ if (desiredState == ScheduledState.STOPPED && status.getActiveThreadCount() != 0) {
+ return false;
+ }
+
+ return true;
+ });
+
+ if (!allProcessorsMatch) {
+ return false;
+ }
+
+ for (final ProcessGroupStatusSnapshotEntity childGroupEntity : statusSnapshot.getProcessGroupStatusSnapshots()) {
+ final ProcessGroupStatusSnapshotDTO childGroupStatus = childGroupEntity.getProcessGroupStatusSnapshot();
+ final boolean allMatchChildLevel = isProcessorStatusEqual(childGroupStatus, processorIds, desiredState);
+ if (!allMatchChildLevel) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+
+
+ /**
+ * Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State.
+ *
+ * @param client the Jersey Client to use for making the HTTP Request
+ * @param groupId the ID of the Process Group to poll
+ * @param serviceIds the ID of all Controller Services whose state should be equal to the given desired state
+ * @param desiredState the desired state for all services with the ID's given
+ * @param pause the Pause that can be used to wait between polling
+ * @return <code>true</code> if successful, <code>false</code> if unable to wait for services to reach the desired state
+ */
+ private boolean waitForControllerServiceStatus(final Client client, final URI originalUri, final String groupId, final Set<String> serviceIds, final ControllerServiceState desiredState,
+ final Pause pause) {
+ URI groupUri;
+ try {
+ groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
+ originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", "includeAncestorGroups=false,includeDescendantGroups=true", originalUri.getFragment());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+
+ boolean continuePolling = true;
+ while (continuePolling) {
+ final ClientResponse response = client.resource(groupUri).header("Content-Type", "application/json").get(ClientResponse.class);
+ if (response.getStatus() != Status.OK.getStatusCode()) {
+ return false;
+ }
+
+ final ControllerServicesEntity controllerServicesEntity = response.getEntity(ControllerServicesEntity.class);
+ final Set<ControllerServiceEntity> serviceEntities = controllerServicesEntity.getControllerServices();
+
+ final String desiredStateName = desiredState.name();
+ final boolean allServicesMatch = serviceEntities.stream()
+ .map(entity -> entity.getComponent())
+ .filter(service -> serviceIds.contains(service.getId()))
+ .map(service -> service.getState())
+ .allMatch(state -> state.equals(desiredStateName));
+
+ if (allServicesMatch) {
+ logger.debug("All {} controller services of interest now have the desired state of {}", serviceIds.size(), desiredState);
+ return true;
+ }
+
+ // Not all of the processors are in the desired state. Pause for a bit and poll again.
+ continuePolling = pause.pause();
+ }
+
+ return false;
+ }
+
+
+ /**
+ * Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State.
+ *
+ * @param groupId the ID of the Process Group to poll
+ * @param serviceIds the ID of all Controller Services whose state should be equal to the given desired state
+ * @param desiredState the desired state for all services with the ID's given
+ * @param pause the Pause that can be used to wait between polling
+ * @param user the user that is retrieving the controller services
+ * @return <code>true</code> if successful, <code>false</code> if unable to wait for services to reach the desired state
+ */
+ private boolean waitForLocalControllerServiceStatus(final String groupId, final Set<String> serviceIds, final ControllerServiceState desiredState, final Pause pause, final NiFiUser user) {
+ boolean continuePolling = true;
+ while (continuePolling) {
+ final Set<ControllerServiceEntity> serviceEntities = serviceFacade.getControllerServices(groupId, false, true, user);
+
+ final String desiredStateName = desiredState.name();
+ final boolean allServicesMatch = serviceEntities.stream()
+ .map(entity -> entity.getComponent())
+ .filter(service -> serviceIds.contains(service.getId()))
+ .map(service -> service.getState())
+ .allMatch(state -> desiredStateName.equals(state));
+
+ if (allServicesMatch) {
+ logger.debug("All {} controller services of interest now have the desired state of {}", serviceIds.size(), desiredState);
+ return true;
+ }
+
+ // Not all of the processors are in the desired state. Pause for a bit and poll again.
+ continuePolling = pause.pause();
+ }
+
+ return false;
+ }
+
+ private VariableRegistryUpdateRequest createVariableRegistryUpdateRequest(final String groupId) {
+ final VariableRegistryUpdateRequest updateRequest = new VariableRegistryUpdateRequest(UUID.randomUUID().toString(), groupId);
+
+ // before adding to the request map, purge any old requests. Must do this by creating a List of ID's
+ // and then removing those ID's one-at-a-time in order to avoid ConcurrentModificationException.
+ final Date oneMinuteAgo = new Date(System.currentTimeMillis() - VARIABLE_REGISTRY_UPDATE_REQUEST_EXPIRATION);
+ final List<String> completedRequestIds = varRegistryUpdateRequests.entrySet().stream()
+ .filter(entry -> entry.getValue().isComplete())
+ .filter(entry -> entry.getValue().getLastUpdated().before(oneMinuteAgo))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+
+ completedRequestIds.stream().forEach(id -> varRegistryUpdateRequests.remove(id));
+
+ final int requestCount = varRegistryUpdateRequests.size();
+ if (requestCount > MAX_VARIABLE_REGISTRY_UPDATE_REQUESTS) {
+ throw new IllegalStateException("There are already " + requestCount + " update requests for variable registries. "
+ + "Cannot issue any more requests until the older ones are deleted or expire");
+ }
+
+ this.varRegistryUpdateRequests.put(updateRequest.getRequestId(), updateRequest);
+ return updateRequest;
+ }
+
+ private Response updateVariableRegistryLocal(final String groupId, final List<AffectedComponentDTO> affectedProcessors, final List<AffectedComponentDTO> affectedServices,
+ final VariableRegistryEntity requestEntity) {
+
+ final Set<String> affectedProcessorIds = affectedProcessors == null ? Collections.emptySet() : affectedProcessors.stream()
+ .map(component -> component.getComponentId())
+ .collect(Collectors.toSet());
+ Map<String, Revision> processorRevisionMap = getRevisions(groupId, affectedProcessorIds);
+
+ final Set<String> affectedServiceIds = affectedServices == null ? Collections.emptySet() : affectedServices.stream()
+ .map(component -> component.getComponentId())
+ .collect(Collectors.toSet());
+ Map<String, Revision> serviceRevisionMap = getRevisions(groupId, affectedServiceIds);
+
+ // update the variable registry
+ final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId);
+ updateRequest.getIdentifyRelevantComponentsStep().setComplete(true);
+ final Pause pause = createPause(updateRequest);
+
+ final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId);
+
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+ final Runnable updateTask = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // Stop processors
+ performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getStopProcessorsStep(), "Stopping Processors",
+ () -> stopProcessors(user, updateRequest, groupId, processorRevisionMap, pause));
+
+ // Update revision map because this will have modified the revisions of our components.
+ final Map<String, Revision> updatedProcessorRevisionMap = getRevisions(groupId, affectedProcessorIds);
+
+ // Disable controller services
+ performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getDisableServicesStep(), "Disabling Controller Services",
+ () -> disableControllerServices(user, updateRequest, groupId, serviceRevisionMap, pause));
+
+ // Update revision map because this will have modified the revisions of our components.
+ final Map<String, Revision> updatedServiceRevisionMap = getRevisions(groupId, affectedServiceIds);
+
+ // Apply the updates
+ performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getApplyUpdatesStep(), "Applying updates to Variable Registry",
+ () -> serviceFacade.updateVariableRegistry(user, requestRevision, requestEntity.getVariableRegistry()));
+
+ // Re-enable the controller services
+ performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getEnableServicesStep(), "Re-enabling Controller Services",
+ () -> enableControllerServices(user, groupId, updatedServiceRevisionMap, pause));
+
+ // Restart processors
+ performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getStartProcessorsStep(), "Restarting Processors",
+ () -> startProcessors(user, groupId, updatedProcessorRevisionMap, pause));
+
+ // Set complete
+ updateRequest.setComplete(true);
+ updateRequest.setLastUpdated(new Date());
+ } catch (final Exception e) {
+ logger.error("Failed to update Variable Registry for Proces Group with ID " + groupId, e);
+ updateRequest.setFailureReason("An unexpected error has occurred: " + e);
+ }
+ }
+ };
+
+ // Submit the task to be run in the background
+ variableRegistryThreadPool.submit(updateTask);
+
+ final VariableRegistryUpdateRequestEntity responseEntity = new VariableRegistryUpdateRequestEntity();
+ responseEntity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest));
+ responseEntity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId()));
+
+ final URI location = URI.create(responseEntity.getRequestDto().getUri());
+ return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build();
+ }
+
+ private Map<String, Revision> getRevisions(final String groupId, final Set<String> componentIds) {
+ final Set<Revision> processorRevisions = serviceFacade.getRevisionsFromGroup(groupId, group -> componentIds);
+ return processorRevisions.stream().collect(Collectors.toMap(revision -> revision.getComponentId(), Function.identity()));
+ }
+
+ private void performUpdateVariableRegistryStep(final String groupId, final VariableRegistryUpdateRequest request, final VariableRegistryUpdateStep step,
+ final String stepDescription, final Runnable action) {
+
+ if (request.isComplete()) {
+ logger.info("In updating Variable Registry for Process Group with ID {}"
+ + ", skipping the following step because the request has completed already: {}", groupId, stepDescription);
+ return;
+ }
+
+ try {
+ logger.info("In order to update Variable Registry for Process Group with ID {}, {}", groupId, stepDescription);
+
+ action.run();
+ step.setComplete(true);
+ } catch (final Exception e) {
+ request.setComplete(true);
+ logger.error("Failed to update variable registry for Process Group with ID {}", groupId, e);
+
+ step.setComplete(true);
+ step.setFailureReason(e.getMessage());
+ request.setFailureReason("Failed to update Variable Registry because failed while performing step: " + stepDescription);
+ }
+
+ request.setLastUpdated(new Date());
+ }
+
+ private void stopProcessors(final NiFiUser user, final VariableRegistryUpdateRequest updateRequest, final String processGroupId,
+ final Map<String, Revision> processorRevisions, final Pause pause) {
+
+ if (processorRevisions.isEmpty()) {
+ return;
+ }
+
+ serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.STOPPED, processorRevisions.keySet());
+ serviceFacade.scheduleComponents(user, processGroupId, ScheduledState.STOPPED, processorRevisions);
+ waitForLocalProcessorStatus(processGroupId, processorRevisions.keySet(), ScheduledState.STOPPED, pause);
+ }
+
+ private void startProcessors(final NiFiUser user, final String processGroupId, final Map<String, Revision> processorRevisions, final Pause pause) {
+ if (processorRevisions.isEmpty()) {
+ return;
+ }
+
+ serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.RUNNING, processorRevisions.keySet());
+ serviceFacade.scheduleComponents(user, processGroupId, ScheduledState.RUNNING, processorRevisions);
+ waitForLocalProcessorStatus(processGroupId, processorRevisions.keySet(), ScheduledState.RUNNING, pause);
+ }
+
+ private void disableControllerServices(final NiFiUser user, final VariableRegistryUpdateRequest updateRequest, final String processGroupId,
+ final Map<String, Revision> serviceRevisions, final Pause pause) {
+
+ if (serviceRevisions.isEmpty()) {
+ return;
+ }
+
+ serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.DISABLED, serviceRevisions.keySet());
+ serviceFacade.activateControllerServices(user, processGroupId, ControllerServiceState.DISABLED, serviceRevisions);
+ waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.DISABLED, pause, user);
+ }
+
+ private void enableControllerServices(final NiFiUser user, final String processGroupId, final Map<String, Revision> serviceRevisions, final Pause pause) {
+ if (serviceRevisions.isEmpty()) {
+ return;
+ }
+
+ serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.ENABLED, serviceRevisions.keySet());
+ serviceFacade.activateControllerServices(user, processGroupId, ControllerServiceState.ENABLED, serviceRevisions);
+ waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.ENABLED, pause, user);
+ }
+
+
+ private void scheduleProcessors(final String groupId, final URI originalUri, final Client jerseyClient, final VariableRegistryUpdateRequest updateRequest,
+ final Pause pause, final Collection<AffectedComponentDTO> affectedProcessors, final ScheduledState desiredState, final VariableRegistryUpdateStep updateStep) {
+ final Set<String> affectedProcessorIds = affectedProcessors.stream()
+ .map(component -> component.getComponentId())
+ .collect(Collectors.toSet());
+
+ final Map<String, Revision> processorRevisionMap = getRevisions(groupId, affectedProcessorIds);
+ final Map<String, RevisionDTO> processorRevisionDtoMap = processorRevisionMap.entrySet().stream().collect(
+ Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue())));
+
+ final ScheduleComponentsEntity stopProcessorsEntity = new ScheduleComponentsEntity();
+ stopProcessorsEntity.setComponents(processorRevisionDtoMap);
+ stopProcessorsEntity.setId(groupId);
+ stopProcessorsEntity.setState(desiredState.name());
+
+ URI stopProcessorUri;
+ try {
+ stopProcessorUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
+ originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId, null, originalUri.getFragment());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+
+ final ClientResponse stopProcessorResponse = jerseyClient.resource(stopProcessorUri)
+ .header("Content-Type", "application/json")
+ .entity(stopProcessorsEntity)
+ .put(ClientResponse.class);
+
+ final int stopProcessorStatus = stopProcessorResponse.getStatus();
+ if (stopProcessorStatus != Status.OK.getStatusCode()) {
+ updateRequest.getStopProcessorsStep().setFailureReason("Failed while " + updateStep.getDescription());
+ updateRequest.setFailureReason("Failed while " + updateStep.getDescription());
+ return;
+ }
+
+ updateRequest.setLastUpdated(new Date());
+ final boolean processorsTransitioned = waitForProcessorStatus(jerseyClient, originalUri, groupId, affectedProcessorIds, desiredState, pause);
+ if (processorsTransitioned) {
+ updateStep.setComplete(true);
+ } else {
+ updateStep.setFailureReason("Failed while " + updateStep.getDescription());
+ updateRequest.setFailureReason("Failed while " + updateStep.getDescription());
+ return;
+ }
+ }
+
+ private void activateControllerServices(final String groupId, final URI originalUri, final Client jerseyClient, final VariableRegistryUpdateRequest updateRequest,
+ final Pause pause, final Collection<AffectedComponentDTO> affectedServices, final ControllerServiceState desiredState, final VariableRegistryUpdateStep updateStep) {
+
+ final Set<String> affectedServiceIds = affectedServices.stream()
+ .map(component -> component.getComponentId())
+ .collect(Collectors.toSet());
+
+ final Map<String, Revision> serviceRevisionMap = getRevisions(groupId, affectedServiceIds);
+ final Map<String, RevisionDTO> serviceRevisionDtoMap = serviceRevisionMap.entrySet().stream().collect(
+ Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue())));
+
+ final ActivateControllerServicesEntity disableServicesEntity = new ActivateControllerServicesEntity();
+ disableServicesEntity.setComponents(serviceRevisionDtoMap);
+ disableServicesEntity.setId(groupId);
+ disableServicesEntity.setState(desiredState.name());
+
+ URI disableServicesUri;
+ try {
+ disableServicesUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
+ originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", null, originalUri.getFragment());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+
+ final ClientResponse disableServicesResponse = jerseyClient.resource(disableServicesUri)
+ .header("Content-Type", "application/json")
+ .entity(disableServicesEntity)
+ .put(ClientResponse.class);
+
+ final int disableServicesStatus = disableServicesResponse.getStatus();
+ if (disableServicesStatus != Status.OK.getStatusCode()) {
+ updateStep.setFailureReason("Failed while " + updateStep.getDescription());
+ updateRequest.setFailureReason("Failed while " + updateStep.getDescription());
+ return;
+ }
+
+ updateRequest.setLastUpdated(new Date());
+ if (waitForControllerServiceStatus(jerseyClient, originalUri, groupId, affectedServiceIds, desiredState, pause)) {
+ updateStep.setComplete(true);
+ } else {
+ updateStep.setFailureReason("Failed while " + updateStep.getDescription());
+ updateRequest.setFailureReason("Failed while " + updateStep.getDescription());
+ return;
+ }
+ }
+
+
+ private void applyVariableRegistryUpdate(final String groupId, final URI originalUri, final Client jerseyClient, final VariableRegistryUpdateRequest updateRequest,
+ final VariableRegistryEntity updateEntity) {
+
+ URI applyUpdatesUri;
+ try {
+ applyUpdatesUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
+ originalUri.getPort(), "/nifi-api/process-groups/" + groupId + "/variable-registry", null, originalUri.getFragment());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+
+ final ClientResponse applyUpdatesResponse = jerseyClient.resource(applyUpdatesUri)
+ .header("Content-Type", "application/json")
+ .entity(updateEntity)
+ .put(ClientResponse.class);
+
+ final int applyUpdatesStatus = applyUpdatesResponse.getStatus();
+ updateRequest.setLastUpdated(new Date());
+ if (applyUpdatesStatus != Status.OK.getStatusCode()) {
+ updateRequest.getApplyUpdatesStep().setFailureReason("Failed to apply updates to the Variable Registry");
+ updateRequest.setFailureReason("Failed to apply updates to the Variable Registry");
+ return;
+ }
+ }
+
/**
* Removes the specified process group reference.
*
@@ -2426,4 +3378,8 @@ public class ProcessGroupResource extends ApplicationResource {
public void setAuthorizer(Authorizer authorizer) {
this.authorizer = authorizer;
}
+
+ public void setDtoFactory(DtoFactory dtoFactory) {
+ this.dtoFactory = dtoFactory;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 4332a0c..a445e49 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -16,6 +16,33 @@
*/
package org.apache.nifi.web.api.dto;
+import java.text.Collator;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
@@ -47,6 +74,7 @@ import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.User;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ComponentAuthorizable;
+import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
@@ -110,6 +138,9 @@ import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
import org.apache.nifi.provenance.lineage.LineageEdge;
import org.apache.nifi.provenance.lineage.LineageNode;
import org.apache.nifi.provenance.lineage.ProvenanceEventLineageNode;
+import org.apache.nifi.registry.ComponentVariableRegistry;
+import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
+import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.reporting.Bulletin;
@@ -161,35 +192,10 @@ import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.ProcessorStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.TenantEntity;
+import org.apache.nifi.web.api.entity.VariableEntity;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.revision.RevisionManager;
-import javax.ws.rs.WebApplicationException;
-import java.text.Collator;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
public final class DtoFactory {
@SuppressWarnings("rawtypes")
@@ -1712,9 +1718,34 @@ public final class DtoFactory {
* @return dto
*/
public PermissionsDTO createPermissionsDto(final Authorizable authorizable) {
+ return createPermissionsDto(authorizable, NiFiUserUtils.getNiFiUser());
+ }
+
+ /**
+ * Creates the PermissionsDTO based on the specified Authorizable for the given user
+ *
+ * @param authorizable authorizable
+ * @param user the NiFi User for which the Permissions are being created
+ * @return dto
+ */
+ public PermissionsDTO createPermissionsDto(final Authorizable authorizable, final NiFiUser user) {
final PermissionsDTO dto = new PermissionsDTO();
- dto.setCanRead(authorizable.isAuthorized(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()));
- dto.setCanWrite(authorizable.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()));
+ dto.setCanRead(authorizable.isAuthorized(authorizer, RequestAction.READ, user));
+ dto.setCanWrite(authorizable.isAuthorized(authorizer, RequestAction.WRITE, user));
+ return dto;
+ }
+
+ public AffectedComponentDTO createAffectedComponentDto(final ConfiguredComponent component) {
+ final AffectedComponentDTO dto = new AffectedComponentDTO();
+ dto.setComponentId(component.getIdentifier());
+ dto.setParentGroupId(component.getProcessGroupIdentifier());
+
+ if (component instanceof ProcessorNode) {
+ dto.setComponentType(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
+ } else if (component instanceof ControllerServiceNode) {
+ dto.setComponentType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+ }
+
return dto;
}
@@ -1998,6 +2029,10 @@ public final class DtoFactory {
dto.setComments(group.getComments());
dto.setName(group.getName());
+ final Map<String, String> variables = group.getVariableRegistry().getVariableMap().entrySet().stream()
+ .collect(Collectors.toMap(entry -> entry.getKey().getName(), entry -> entry.getValue()));
+ group.setVariables(variables);
+
final ProcessGroup parentGroup = group.getParent();
if (parentGroup != null) {
dto.setParentGroupId(parentGroup.getIdentifier());
@@ -2079,6 +2114,128 @@ public final class DtoFactory {
return deprecationNotice == null ? null : deprecationNotice.reason();
}
+
+ public VariableRegistryDTO createVariableRegistryDto(final ProcessGroup processGroup) {
+ final ComponentVariableRegistry variableRegistry = processGroup.getVariableRegistry();
+
+ final List<String> variableNames = variableRegistry.getVariableMap().keySet().stream()
+ .map(descriptor -> descriptor.getName())
+ .collect(Collectors.toList());
+
+ final Set<VariableEntity> variableEntities = new LinkedHashSet<>();
+
+ for (final String variableName : variableNames) {
+ final VariableDTO variableDto = new VariableDTO();
+ variableDto.setName(variableName);
+ variableDto.setValue(variableRegistry.getVariableValue(variableName));
+ variableDto.setProcessGroupId(processGroup.getIdentifier());
+
+ final Set<ConfiguredComponent> affectedComponents = processGroup.getComponentsAffectedByVariable(variableName);
+ final Set<AffectedComponentDTO> affectedComponentDtos = affectedComponents.stream()
+ .map(component -> createAffectedComponentDto(component))
+ .collect(Collectors.toSet());
+
+ boolean canWrite = true;
+ for (final ConfiguredComponent component : affectedComponents) {
+ final PermissionsDTO permissions = createPermissionsDto(component);
+ if (!permissions.getCanRead() || !permissions.getCanWrite()) {
+ canWrite = false;
+ break;
+ }
+ }
+
+ variableDto.setAffectedComponents(affectedComponentDtos);
+
+ final VariableEntity variableEntity = new VariableEntity();
+ variableEntity.setVariable(variableDto);
+ variableEntity.setCanWrite(canWrite);
+
+ variableEntities.add(variableEntity);
+ }
+
+ final VariableRegistryDTO registryDto = new VariableRegistryDTO();
+ registryDto.setProcessGroupId(processGroup.getIdentifier());
+ registryDto.setVariables(variableEntities);
+
+ return registryDto;
+ }
+
+ public VariableRegistryUpdateRequestDTO createVariableRegistryUpdateRequestDto(final VariableRegistryUpdateRequest request) {
+ final VariableRegistryUpdateRequestDTO dto = new VariableRegistryUpdateRequestDTO();
+ dto.setComplete(request.isComplete());
+ dto.setFailureReason(request.getFailureReason());
+ dto.setLastUpdated(request.getLastUpdated());
+ dto.setProcessGroupId(request.getProcessGroupId());
+ dto.setRequestId(request.getRequestId());
+ dto.setSubmissionTime(request.getSubmissionTime());
+
+ final List<VariableRegistryUpdateStepDTO> updateSteps = new ArrayList<>();
+ updateSteps.add(createVariableRegistryUpdateStepDto(request.getIdentifyRelevantComponentsStep()));
+ updateSteps.add(createVariableRegistryUpdateStepDto(request.getStopProcessorsStep()));
+ updateSteps.add(createVariableRegistryUpdateStepDto(request.getDisableServicesStep()));
+ updateSteps.add(createVariableRegistryUpdateStepDto(request.getApplyUpdatesStep()));
+ updateSteps.add(createVariableRegistryUpdateStepDto(request.getEnableServicesStep()));
+ updateSteps.add(createVariableRegistryUpdateStepDto(request.getStartProcessorsStep()));
+ dto.setUpdateSteps(updateSteps);
+
+ return dto;
+ }
+
+ public VariableRegistryUpdateStepDTO createVariableRegistryUpdateStepDto(final VariableRegistryUpdateStep step) {
+ final VariableRegistryUpdateStepDTO dto = new VariableRegistryUpdateStepDTO();
+ dto.setComplete(step.isComplete());
+ dto.setDescription(step.getDescription());
+ dto.setFailureReason(step.getFailureReason());
+ return dto;
+ }
+
+
+ public VariableRegistryDTO populateAffectedComponents(final VariableRegistryDTO variableRegistry, final ProcessGroup group) {
+ if (!group.getIdentifier().equals(variableRegistry.getProcessGroupId())) {
+ throw new IllegalArgumentException("Variable Registry does not have the same Group ID as the given Process Group");
+ }
+
+ final Set<VariableEntity> variableEntities = new LinkedHashSet<>();
+
+ for (final VariableEntity inputEntity : variableRegistry.getVariables()) {
+ final VariableEntity entity = new VariableEntity();
+
+ final VariableDTO inputDto = inputEntity.getVariable();
+ final VariableDTO variableDto = new VariableDTO();
+ variableDto.setName(inputDto.getName());
+ variableDto.setValue(inputDto.getValue());
+ variableDto.setProcessGroupId(group.getIdentifier());
+
+ final Set<ConfiguredComponent> affectedComponents = group.getComponentsAffectedByVariable(variableDto.getName());
+ final Set<AffectedComponentDTO> affectedComponentDtos = affectedComponents.stream()
+ .map(component -> createAffectedComponentDto(component))
+ .collect(Collectors.toSet());
+
+ boolean canWrite = true;
+ for (final ConfiguredComponent component : affectedComponents) {
+ final PermissionsDTO permissions = createPermissionsDto(component);
+ if (!permissions.getCanRead() || !permissions.getCanWrite()) {
+ canWrite = false;
+ break;
+ }
+ }
+
+ variableDto.setAffectedComponents(affectedComponentDtos);
+
+ entity.setCanWrite(canWrite);
+ entity.setVariable(inputDto);
+
+ variableEntities.add(entity);
+ }
+
+ final VariableRegistryDTO registryDto = new VariableRegistryDTO();
+ registryDto.setProcessGroupId(group.getIdentifier());
+ registryDto.setVariables(variableEntities);
+
+ return registryDto;
+ }
+
+
/**
* Gets the capability description from the specified class.
*/
@@ -3016,6 +3173,10 @@ public final class DtoFactory {
copy.setActiveRemotePortCount(original.getActiveRemotePortCount());
copy.setInactiveRemotePortCount(original.getInactiveRemotePortCount());
+ if (original.getVariables() != null) {
+ copy.setVariables(new HashMap<>(original.getVariables()));
+ }
+
return copy;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
index 41249ba..a7f370a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
@@ -65,6 +65,7 @@ import org.apache.nifi.web.api.entity.StatusHistoryEntity;
import org.apache.nifi.web.api.entity.TenantEntity;
import org.apache.nifi.web.api.entity.UserEntity;
import org.apache.nifi.web.api.entity.UserGroupEntity;
+import org.apache.nifi.web.api.entity.VariableRegistryEntity;
import java.util.Date;
import java.util.List;
@@ -431,6 +432,18 @@ public final class EntityFactory {
return entity;
}
+ public VariableRegistryEntity createVariableRegistryEntity(final VariableRegistryDTO dto, final RevisionDTO revision, final PermissionsDTO permissions) {
+ final VariableRegistryEntity entity = new VariableRegistryEntity();
+ entity.setProcessGroupRevision(revision);
+ if (dto != null) {
+ if (permissions != null && permissions.getCanRead()) {
+ entity.setVariableRegistry(dto);
+ }
+ }
+
+ return entity;
+ }
+
public ControllerServiceEntity createControllerServiceEntity(final ControllerServiceDTO dto, final RevisionDTO revision, final PermissionsDTO permissions, final List<BulletinEntity> bulletins) {
final ControllerServiceEntity entity = new ControllerServiceEntity();
entity.setRevision(revision);
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java
index fa92425..0409e95 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java
@@ -63,7 +63,7 @@ public interface ControllerServiceDAO {
*
* @return The controller services
*/
- Set<ControllerServiceNode> getControllerServices(String groupId);
+ Set<ControllerServiceNode> getControllerServices(String groupId, boolean includeAncestorGroups, boolean includeDescendantGroups);
/**
* Updates the specified controller service.
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
index 155b36e..d7ca806 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
@@ -16,11 +16,14 @@
*/
package org.apache.nifi.web.dao;
+import java.util.Set;
+import java.util.concurrent.Future;
+
import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
-
-import java.util.Set;
+import org.apache.nifi.web.api.dto.VariableRegistryDTO;
public interface ProcessGroupDAO {
@@ -65,12 +68,32 @@ public interface ProcessGroupDAO {
void verifyScheduleComponents(String groupId, ScheduledState state, Set<String> componentIds);
/**
+ * Verifies the specified controller services can be modified
+ *
+ * @param groupId the ID of the process group
+ * @param state the desired state
+ * @param serviceIds the ID's of the controller services
+ */
+ void verifyActivateControllerServices(String groupId, ControllerServiceState state, Set<String> serviceIds);
+
+ /**
* Schedules the components in the specified process group.
*
* @param groupId id
* @param state scheduled state
+ *
+ * @return a Future that can be used to wait for the services to finish starting or stopping
*/
- void scheduleComponents(String groupId, ScheduledState state, Set<String> componentIds);
+ Future<Void> scheduleComponents(String groupId, ScheduledState state, Set<String> componentIds);
+
+ /**
+ * Enables or disables the controller services in the specified process group
+ *
+ * @param groupId the id of the group
+ * @param state the desired state
+ * @param serviceIds the ID's of the services to enable or disable
+ */
+ Future<Void> activateControllerServices(String groupId, ControllerServiceState state, Set<String> serviceIds);
/**
* Updates the specified process group.
@@ -81,6 +104,21 @@ public interface ProcessGroupDAO {
ProcessGroup updateProcessGroup(ProcessGroupDTO processGroup);
/**
+ * Updates the specified variable registry
+ *
+ * @param variableRegistry the Variable Registry
+ * @return the Process Group that was updated
+ */
+ ProcessGroup updateVariableRegistry(VariableRegistryDTO variableRegistry);
+
+ /**
+ * Verifies that the specified updates to a current Process Group can be applied at this time
+ *
+ * @param processGroup the DTO That describes the changes to occur
+ */
+ void verifyUpdate(ProcessGroupDTO processGroup);
+
+ /**
* Verifies the specified process group can be removed.
*
* @param groupId id
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
index 36cf85b..0f9ec7a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
@@ -124,7 +124,7 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
}
@Override
- public Set<ControllerServiceNode> getControllerServices(final String groupId) {
+ public Set<ControllerServiceNode> getControllerServices(final String groupId, final boolean includeAncestorGroups, final boolean includeDescendantGroups) {
if (groupId == null) {
return flowController.getRootControllerServices();
} else {
@@ -134,7 +134,12 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
throw new ResourceNotFoundException("Could not find Process Group with ID " + groupId);
}
- return procGroup.getControllerServices(true);
+ final Set<ControllerServiceNode> serviceNodes = procGroup.getControllerServices(includeAncestorGroups);
+ if (includeDescendantGroups) {
+ serviceNodes.addAll(procGroup.findAllControllerServices());
+ }
+
+ return serviceNodes;
}
}