You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/09/14 15:13:06 UTC
[3/4] nifi git commit: NIFI-4280: - Adding support for the user to
configure variables in the UI. - Updating the endpoints for changing
variables as necessary. This closes #2135.
http://git-wip-us.apache.org/repos/asf/nifi/blob/eac47e90/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 0b634a1..0e574e0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -16,56 +16,18 @@
*/
package org.apache.nifi.web.api;
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.UriBuilder;
-import javax.ws.rs.core.UriInfo;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.transform.stream.StreamSource;
-
+import com.sun.jersey.api.core.ResourceContext;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import com.sun.jersey.multipart.FormDataParam;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AuthorizableLookup;
+import org.apache.nifi.authorization.AuthorizeAccess;
import org.apache.nifi.authorization.AuthorizeControllerServiceReference;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.ComponentAuthorizable;
@@ -75,19 +37,18 @@ import org.apache.nifi.authorization.SnippetAuthorizable;
import org.apache.nifi.authorization.TemplateContentsAuthorizable;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.authorization.user.NiFiUserDetails;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.serialization.FlowEncodingVersion;
import org.apache.nifi.controller.service.ControllerServiceState;
-import org.apache.nifi.framework.security.util.SslContextFactory;
import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest;
import org.apache.nifi.registry.variable.VariableRegistryUpdateStep;
import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
import org.apache.nifi.util.BundleUtils;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.Revision;
@@ -106,9 +67,9 @@ import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.api.dto.VariableRegistryDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
-import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
-import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
+import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ConnectionsEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
@@ -125,8 +86,6 @@ import org.apache.nifi.web.api.entity.LabelsEntity;
import org.apache.nifi.web.api.entity.OutputPortsEntity;
import org.apache.nifi.web.api.entity.PortEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
-import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
-import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.ProcessGroupsEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorsEntity;
@@ -138,23 +97,61 @@ import org.apache.nifi.web.api.entity.VariableRegistryEntity;
import org.apache.nifi.web.api.entity.VariableRegistryUpdateRequestEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.LongParameter;
+import org.apache.nifi.web.security.token.NiFiAuthenticationToken;
import org.apache.nifi.web.util.Pause;
-import org.apache.nifi.web.util.WebUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.context.SecurityContextHolder;
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import com.sun.jersey.api.core.ResourceContext;
-import com.sun.jersey.multipart.FormDataParam;
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriInfo;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
/**
* RESTful endpoint for managing a Group.
@@ -455,14 +452,12 @@ public class ProcessGroupResource extends ApplicationResource {
throw new IllegalArgumentException("Group ID and Update ID must both be specified.");
}
- if (isReplicateRequest()) {
- return replicate(HttpMethod.GET);
- }
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
- processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+ processGroup.authorize(authorizer, RequestAction.READ, user);
});
final VariableRegistryUpdateRequest request = varRegistryUpdateRequests.get(updateId);
@@ -474,9 +469,14 @@ public class ProcessGroupResource extends ApplicationResource {
throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId + " for Process Group with identifier " + groupId);
}
+ if (!user.equals(request.getUser())) {
+ throw new IllegalArgumentException("Only the user that submitted the update request can retrieve it.");
+ }
+
final VariableRegistryUpdateRequestEntity entity = new VariableRegistryUpdateRequestEntity();
- entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request));
- entity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId));
+ entity.setRequest(dtoFactory.createVariableRegistryUpdateRequestDto(request));
+ entity.setProcessGroupRevision(request.getProcessGroupRevision());
+ entity.getRequest().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId));
return generateOkResponse(entity).build();
}
@@ -506,15 +506,13 @@ public class ProcessGroupResource extends ApplicationResource {
throw new IllegalArgumentException("Group ID and Update ID must both be specified.");
}
- if (isReplicateRequest()) {
- return replicate(HttpMethod.DELETE);
- }
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
- processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
- processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+ processGroup.authorize(authorizer, RequestAction.READ, user);
+ processGroup.authorize(authorizer, RequestAction.WRITE, user);
});
final VariableRegistryUpdateRequest request = varRegistryUpdateRequests.remove(updateId);
@@ -526,11 +524,16 @@ public class ProcessGroupResource extends ApplicationResource {
throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId + " for Process Group with identifier " + groupId);
}
+ if (!user.equals(request.getUser())) {
+ throw new IllegalArgumentException("Only the user that submitted the update request can remove it.");
+ }
+
request.cancel();
final VariableRegistryUpdateRequestEntity entity = new VariableRegistryUpdateRequestEntity();
- entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request));
- entity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId));
+ entity.setRequest(dtoFactory.createVariableRegistryUpdateRequestDto(request));
+ entity.setProcessGroupRevision(request.getProcessGroupRevision());
+ entity.getRequest().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId));
return generateOkResponse(entity).build();
}
@@ -634,11 +637,11 @@ public class ProcessGroupResource extends ApplicationResource {
// 1. Determine Affected Components (this includes any Processors and Controller Services and any components that reference an affected Controller Service).
// 1a. Determine ID's of components
// 1b. Determine Revision's of associated components
- // 2. Stop All Affected Processors
- // 3. Disable All Affected Controller Services
+ // 2. Stop All Active Affected Processors
+ // 3. Disable All Active Affected Controller Services
// 4. Update the Variables
- // 5. Re-Enable all Affected Controller Services (services only, not dependent components)
- // 6. Re-Enable all Processors that Depended on the Controller Services
+ // 5. Re-Enable all previously Active Affected Controller Services (services only, not dependent components)
+ // 6. Re-Enable all previously Active Processors that Depended on the Controller Services
// Determine the affected components (and their associated revisions)
final VariableRegistryEntity computedEntity = serviceFacade.populateAffectedComponents(requestEntity.getVariableRegistry());
@@ -647,38 +650,77 @@ public class ProcessGroupResource extends ApplicationResource {
throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
}
- final Set<AffectedComponentDTO> affectedComponents = serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry());
+ final Set<AffectedComponentEntity> allAffectedComponents = serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry());
+ final Set<AffectedComponentDTO> activeAffectedComponents = serviceFacade.getActiveComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry());
- final Map<String, List<AffectedComponentDTO>> affectedComponentsByType = affectedComponents.stream()
- .collect(Collectors.groupingBy(comp -> comp.getComponentType()));
+ final Map<String, List<AffectedComponentDTO>> activeAffectedComponentsByType = activeAffectedComponents.stream()
+ .collect(Collectors.groupingBy(comp -> comp.getReferenceType()));
- final List<AffectedComponentDTO> affectedProcessors = affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
- final List<AffectedComponentDTO> affectedServices = affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+ final List<AffectedComponentDTO> activeAffectedProcessors = activeAffectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
+ final List<AffectedComponentDTO> activeAffectedServices = activeAffectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ // define access authorize for execution below
+ final AuthorizeAccess authorizeAccess = lookup -> {
+ final Authorizable groupAuthorizable = lookup.getProcessGroup(groupId).getAuthorizable();
+ groupAuthorizable.authorize(authorizer, RequestAction.WRITE, user);
+
+ // For every component that is affected, the user must have READ permissions and WRITE permissions
+ // (because this action requires stopping the component).
+ if (activeAffectedProcessors != null) {
+ for (final AffectedComponentDTO activeAffectedComponent : activeAffectedProcessors) {
+ final Authorizable authorizable = lookup.getProcessor(activeAffectedComponent.getId()).getAuthorizable();
+ authorizable.authorize(authorizer, RequestAction.READ, user);
+ authorizable.authorize(authorizer, RequestAction.WRITE, user);
+ }
+ }
+
+ if (activeAffectedServices != null) {
+ for (final AffectedComponentDTO activeAffectedComponent : activeAffectedServices) {
+ final Authorizable authorizable = lookup.getControllerService(activeAffectedComponent.getId()).getAuthorizable();
+ authorizable.authorize(authorizer, RequestAction.READ, user);
+ authorizable.authorize(authorizer, RequestAction.WRITE, user);
+ }
+ }
+ };
if (isReplicateRequest()) {
+ // authorize access
+ serviceFacade.authorizeAccess(authorizeAccess);
+
// update the variable registry
- final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId);
+ final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId, allAffectedComponents, user);
updateRequest.getIdentifyRelevantComponentsStep().setComplete(true);
final URI originalUri = getAbsolutePath();
// Submit the task to be run in the background
final Runnable taskWrapper = () -> {
try {
- updateVariableRegistryReplicated(groupId, originalUri, affectedProcessors, affectedServices, updateRequest, requestEntity);
+ // set the user authentication token
+ final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(user));
+ SecurityContextHolder.getContext().setAuthentication(authentication);
+
+ updateVariableRegistryReplicated(groupId, originalUri, activeAffectedProcessors, activeAffectedServices, updateRequest, requestEntity);
} catch (final Exception e) {
logger.error("Failed to update variable registry", e);
+
+ updateRequest.setComplete(true);
updateRequest.setFailureReason("An unexpected error has occurred: " + e);
+ } finally {
+ // clear the authentication token
+ SecurityContextHolder.getContext().setAuthentication(null);
}
};
variableRegistryThreadPool.submit(taskWrapper);
final VariableRegistryUpdateRequestEntity responseEntity = new VariableRegistryUpdateRequestEntity();
- responseEntity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest));
- responseEntity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId()));
+ responseEntity.setRequest(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest));
+ responseEntity.setProcessGroupRevision(updateRequest.getProcessGroupRevision());
+ responseEntity.getRequest().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId()));
- final URI location = URI.create(responseEntity.getRequestDto().getUri());
+ final URI location = URI.create(responseEntity.getRequest().getUri());
return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build();
}
@@ -688,34 +730,10 @@ public class ProcessGroupResource extends ApplicationResource {
serviceFacade,
requestEntity,
requestRevision,
- lookup -> {
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
-
- final Authorizable groupAuthorizable = lookup.getProcessGroup(groupId).getAuthorizable();
- groupAuthorizable.authorize(authorizer, RequestAction.WRITE, user);
-
- // For every component that is affected, the user must have READ permissions and WRITE permissions
- // (because this action requires stopping the component).
- if (affectedProcessors != null) {
- for (final AffectedComponentDTO affectedComponent : affectedProcessors) {
- final Authorizable authorizable = lookup.getProcessor(affectedComponent.getComponentId()).getAuthorizable();
- authorizable.authorize(authorizer, RequestAction.READ, user);
- authorizable.authorize(authorizer, RequestAction.WRITE, user);
- }
- }
-
- if (affectedServices != null) {
- for (final AffectedComponentDTO affectedComponent : affectedServices) {
- final Authorizable authorizable = lookup.getControllerService(affectedComponent.getComponentId()).getAuthorizable();
- authorizable.authorize(authorizer, RequestAction.READ, user);
- authorizable.authorize(authorizer, RequestAction.WRITE, user);
- }
- }
- },
+ authorizeAccess,
null,
- (revision, varRegistryEntity) -> {
- return updateVariableRegistryLocal(groupId, affectedProcessors, affectedServices, requestEntity);
- });
+ (revision, varRegistryEntity) -> updateVariableRegistryLocal(groupId, allAffectedComponents, activeAffectedProcessors, activeAffectedServices, user, requestEntity)
+ );
}
private Pause createPause(final VariableRegistryUpdateRequest updateRequest) {
@@ -739,58 +757,49 @@ public class ProcessGroupResource extends ApplicationResource {
}
private void updateVariableRegistryReplicated(final String groupId, final URI originalUri, final Collection<AffectedComponentDTO> affectedProcessors,
- final Collection<AffectedComponentDTO> affectedServices,
- final VariableRegistryUpdateRequest updateRequest, final VariableRegistryEntity requestEntity) {
-
- final NiFiProperties properties = getProperties();
- final Client jerseyClient = WebUtils.createClient(new DefaultClientConfig(), SslContextFactory.createSslContext(properties));
- final int connectionTimeout = (int) FormatUtils.getTimeDuration(properties.getClusterNodeConnectionTimeout(), TimeUnit.MILLISECONDS);
- final int readTimeout = (int) FormatUtils.getTimeDuration(properties.getClusterNodeReadTimeout(), TimeUnit.MILLISECONDS);
- jerseyClient.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT, connectionTimeout);
- jerseyClient.getProperties().put(ClientConfig.PROPERTY_READ_TIMEOUT, readTimeout);
- jerseyClient.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS, Boolean.TRUE);
+ final Collection<AffectedComponentDTO> affectedServices, final VariableRegistryUpdateRequest updateRequest,
+ final VariableRegistryEntity requestEntity) throws InterruptedException, IOException {
final Pause pause = createPause(updateRequest);
// stop processors
if (affectedProcessors != null) {
- logger.info("In order to update Variable Registry for Process Group with ID {}, "
- + "replicating request to stop {} affected processors", groupId, affectedProcessors.size());
-
- scheduleProcessors(groupId, originalUri, jerseyClient, updateRequest, pause,
- affectedProcessors, ScheduledState.STOPPED, updateRequest.getStopProcessorsStep());
+ logger.info("In order to update Variable Registry for Process Group with ID {}, replicating request to stop {} affected Processors", groupId, affectedProcessors.size());
+ scheduleProcessors(groupId, originalUri, updateRequest, pause, affectedProcessors, ScheduledState.STOPPED, updateRequest.getStopProcessorsStep());
+ } else {
+ logger.info("In order to update Variable Registry for Process Group with ID {}, no Processors are affected.", groupId);
+ updateRequest.getStopProcessorsStep().setComplete(true);
}
// disable controller services
if (affectedServices != null) {
- logger.info("In order to update Variable Registry for Process Group with ID {}, "
- + "replicating request to stop {} affected Controller Services", groupId, affectedServices.size());
-
- activateControllerServices(groupId, originalUri, jerseyClient, updateRequest, pause,
- affectedServices, ControllerServiceState.DISABLED, updateRequest.getDisableServicesStep());
+ logger.info("In order to update Variable Registry for Process Group with ID {}, replicating request to stop {} affected Controller Services", groupId, affectedServices.size());
+ activateControllerServices(groupId, originalUri, updateRequest, pause, affectedServices, ControllerServiceState.DISABLED, updateRequest.getDisableServicesStep());
+ } else {
+ logger.info("In order to update Variable Registry for Process Group with ID {}, no Controller Services are affected.", groupId);
+ updateRequest.getDisableServicesStep().setComplete(true);
}
// apply updates
- logger.info("In order to update Variable Registry for Process Group with ID {}, "
- + "replicating request to apply updates to variable registry", groupId);
- applyVariableRegistryUpdate(groupId, originalUri, jerseyClient, updateRequest, requestEntity);
+ logger.info("In order to update Variable Registry for Process Group with ID {}, replicating request to apply updates to variable registry", groupId);
+ applyVariableRegistryUpdate(groupId, originalUri, updateRequest, requestEntity);
// re-enable controller services
if (affectedServices != null) {
- logger.info("In order to update Variable Registry for Process Group with ID {}, "
- + "replicating request to re-enable {} affected services", groupId, affectedServices.size());
-
- activateControllerServices(groupId, originalUri, jerseyClient, updateRequest, pause,
- affectedServices, ControllerServiceState.ENABLED, updateRequest.getEnableServicesStep());
+ logger.info("In order to update Variable Registry for Process Group with ID {}, replicating request to re-enable {} affected services", groupId, affectedServices.size());
+ activateControllerServices(groupId, originalUri, updateRequest, pause, affectedServices, ControllerServiceState.ENABLED, updateRequest.getEnableServicesStep());
+ } else {
+ logger.info("In order to update Variable Registry for Process Group with ID {}, no Controller Services are affected.", groupId);
+ updateRequest.getEnableServicesStep().setComplete(true);
}
// restart processors
if (affectedProcessors != null) {
- logger.info("In order to update Variable Registry for Process Group with ID {}, "
- + "replicating request to restart {} affected processors", groupId, affectedProcessors.size());
-
- scheduleProcessors(groupId, originalUri, jerseyClient, updateRequest, pause,
- affectedProcessors, ScheduledState.RUNNING, updateRequest.getStartProcessorsStep());
+ logger.info("In order to update Variable Registry for Process Group with ID {}, replicating request to restart {} affected processors", groupId, affectedProcessors.size());
+ scheduleProcessors(groupId, originalUri, updateRequest, pause, affectedProcessors, ScheduledState.RUNNING, updateRequest.getStartProcessorsStep());
+ } else {
+ logger.info("In order to update Variable Registry for Process Group with ID {}, no Processors are affected.", groupId);
+ updateRequest.getStartProcessorsStep().setComplete(true);
}
updateRequest.setComplete(true);
@@ -799,34 +808,45 @@ public class ProcessGroupResource extends ApplicationResource {
/**
* Periodically polls the process group with the given ID, waiting for all processors whose ID's are given to have the given Scheduled State.
*
- * @param client the Jersey Client to use for making the request
* @param groupId the ID of the Process Group to poll
* @param processorIds the ID of all Processors whose state should be equal to the given desired state
* @param desiredState the desired state for all processors with the ID's given
* @param pause the Pause that can be used to wait between polling
* @return <code>true</code> if successful, <code>false</code> if unable to wait for processors to reach the desired state
*/
- private boolean waitForProcessorStatus(final Client client, final URI originalUri, final String groupId, final Set<String> processorIds, final ScheduledState desiredState, final Pause pause) {
+ private boolean waitForProcessorStatus(final URI originalUri, final String groupId, final Set<String> processorIds, final ScheduledState desiredState,
+ final VariableRegistryUpdateRequest updateRequest, final Pause pause) throws InterruptedException {
URI groupUri;
try {
groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
- originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/status", "recursive=true", originalUri.getFragment());
+ originalUri.getPort(), "/nifi-api/process-groups/" + groupId + "/processors", "includeDescendantGroups=true", originalUri.getFragment());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
+ final Map<String, String> headers = new HashMap<>();
+ final MultivaluedMap<String, String> requestEntity = new MultivaluedMapImpl();
+
boolean continuePolling = true;
while (continuePolling) {
- final ClientResponse response = client.resource(groupUri).header("Content-Type", "application/json").get(ClientResponse.class);
- if (response.getStatus() != Status.OK.getStatusCode()) {
+
+ // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
+ final NodeResponse clusterResponse;
+ if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+ clusterResponse = getRequestReplicator().replicate(HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
+ } else {
+ clusterResponse = getRequestReplicator().forwardToCoordinator(
+ getClusterCoordinatorNode(), HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
+ }
+
+ if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
return false;
}
- final ProcessGroupStatusEntity statusEntity = response.getEntity(ProcessGroupStatusEntity.class);
- final ProcessGroupStatusDTO statusDto = statusEntity.getProcessGroupStatus();
- final ProcessGroupStatusSnapshotDTO statusSnapshotDto = statusDto.getAggregateSnapshot();
+ final ProcessorsEntity processorsEntity = getResponseEntity(clusterResponse, ProcessorsEntity.class);
+ final Set<ProcessorEntity> processorEntities = processorsEntity.getProcessors();
- if (isProcessorStatusEqual(statusSnapshotDto, processorIds, desiredState)) {
+ if (isProcessorActionComplete(processorEntities, updateRequest, processorIds, desiredState)) {
logger.debug("All {} processors of interest now have the desired state of {}", processorIds.size(), desiredState);
return true;
}
@@ -847,14 +867,14 @@ public class ProcessGroupResource extends ApplicationResource {
* @param pause the Pause that can be used to wait between polling
* @return <code>true</code> if successful, <code>false</code> if unable to wait for processors to reach the desired state
*/
- private boolean waitForLocalProcessorStatus(final String groupId, final Set<String> processorIds, final ScheduledState desiredState, final Pause pause) {
+ private boolean waitForLocalProcessor(final String groupId, final Set<String> processorIds, final ScheduledState desiredState,
+ final VariableRegistryUpdateRequest updateRequest, final Pause pause) {
+
boolean continuePolling = true;
while (continuePolling) {
- final ProcessGroupStatusEntity statusEntity = serviceFacade.getProcessGroupStatus(groupId, true);
- final ProcessGroupStatusDTO statusDto = statusEntity.getProcessGroupStatus();
- final ProcessGroupStatusSnapshotDTO statusSnapshotDto = statusDto.getAggregateSnapshot();
+ final Set<ProcessorEntity> processorEntities = serviceFacade.getProcessors(groupId, true);
- if (isProcessorStatusEqual(statusSnapshotDto, processorIds, desiredState)) {
+ if (isProcessorActionComplete(processorEntities, updateRequest, processorIds, desiredState)) {
logger.debug("All {} processors of interest now have the desired state of {}", processorIds.size(), desiredState);
return true;
}
@@ -866,55 +886,93 @@ public class ProcessGroupResource extends ApplicationResource {
return false;
}
- private boolean isProcessorStatusEqual(final ProcessGroupStatusSnapshotDTO statusSnapshot, final Set<String> processorIds, final ScheduledState desiredState) {
+ private boolean isProcessorActionComplete(final Set<ProcessorEntity> processorEntities, final VariableRegistryUpdateRequest updateRequest,
+ final Set<String> processorIds, final ScheduledState desiredState) {
+
final String desiredStateName = desiredState.name();
- final boolean allProcessorsMatch = statusSnapshot.getProcessorStatusSnapshots().stream()
- .map(entity -> entity.getProcessorStatusSnapshot())
- .filter(status -> processorIds.contains(status.getId()))
- .allMatch(status -> {
- final String runStatus = status.getRunStatus();
- final boolean stateMatches = desiredStateName.equalsIgnoreCase(runStatus);
- if (!stateMatches) {
- return false;
- }
+ // update the affected processors
+ processorEntities.stream()
+ .filter(entity -> updateRequest.getAffectedComponents().containsKey(entity.getId()))
+ .forEach(entity -> {
+ final AffectedComponentEntity affectedComponentEntity = updateRequest.getAffectedComponents().get(entity.getId());
+ affectedComponentEntity.setRevision(entity.getRevision());
+
+ // only consider update this component if the user had permissions to it
+ if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
+ final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent();
+ affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus());
+ affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount());
+
+ if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
+ affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors());
+ }
+ }
+ });
- if (desiredState == ScheduledState.STOPPED && status.getActiveThreadCount() != 0) {
- return false;
- }
+ final boolean allProcessorsMatch = processorEntities.stream()
+ .filter(entity -> processorIds.contains(entity.getId()))
+ .allMatch(entity -> {
+ final ProcessorStatusDTO status = entity.getStatus();
- return true;
- });
+ final String runStatus = status.getAggregateSnapshot().getRunStatus();
+ final boolean stateMatches = desiredStateName.equalsIgnoreCase(runStatus);
+ if (!stateMatches) {
+ return false;
+ }
+
+ if (desiredState == ScheduledState.STOPPED && status.getAggregateSnapshot().getActiveThreadCount() != 0) {
+ return false;
+ }
+
+ return true;
+ });
if (!allProcessorsMatch) {
return false;
}
- for (final ProcessGroupStatusSnapshotEntity childGroupEntity : statusSnapshot.getProcessGroupStatusSnapshots()) {
- final ProcessGroupStatusSnapshotDTO childGroupStatus = childGroupEntity.getProcessGroupStatusSnapshot();
- final boolean allMatchChildLevel = isProcessorStatusEqual(childGroupStatus, processorIds, desiredState);
- if (!allMatchChildLevel) {
- return false;
- }
- }
-
return true;
}
-
+ /**
+ * Updates the affected controller services in the specified updateRequest with the serviceEntities.
+ *
+ * @param serviceEntities service entities
+ * @param updateRequest update request
+ */
+ private void updateAffectedControllerServices(final Set<ControllerServiceEntity> serviceEntities, final VariableRegistryUpdateRequest updateRequest) {
+ // update the affected components
+ serviceEntities.stream()
+ .filter(entity -> updateRequest.getAffectedComponents().containsKey(entity.getId()))
+ .forEach(entity -> {
+ final AffectedComponentEntity affectedComponentEntity = updateRequest.getAffectedComponents().get(entity.getId());
+ affectedComponentEntity.setRevision(entity.getRevision());
+
+ // only consider update this component if the user had permissions to it
+ if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
+ final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent();
+ affectedComponent.setState(entity.getComponent().getState());
+
+ if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
+ affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors());
+ }
+ }
+ });
+ }
/**
* Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State.
*
- * @param client the Jersey Client to use for making the HTTP Request
* @param groupId the ID of the Process Group to poll
* @param serviceIds the ID of all Controller Services whose state should be equal to the given desired state
* @param desiredState the desired state for all services with the ID's given
* @param pause the Pause that can be used to wait between polling
* @return <code>true</code> if successful, <code>false</code> if unable to wait for services to reach the desired state
*/
- private boolean waitForControllerServiceStatus(final Client client, final URI originalUri, final String groupId, final Set<String> serviceIds, final ControllerServiceState desiredState,
- final Pause pause) {
+ private boolean waitForControllerServiceStatus(final URI originalUri, final String groupId, final Set<String> serviceIds,
+ final ControllerServiceState desiredState, final VariableRegistryUpdateRequest updateRequest, final Pause pause) throws InterruptedException {
+
URI groupUri;
try {
groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
@@ -923,16 +981,31 @@ public class ProcessGroupResource extends ApplicationResource {
throw new RuntimeException(e);
}
+ final Map<String, String> headers = new HashMap<>();
+ final MultivaluedMap<String, String> requestEntity = new MultivaluedMapImpl();
+
boolean continuePolling = true;
while (continuePolling) {
- final ClientResponse response = client.resource(groupUri).header("Content-Type", "application/json").get(ClientResponse.class);
- if (response.getStatus() != Status.OK.getStatusCode()) {
+
+ // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
+ final NodeResponse clusterResponse;
+ if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+ clusterResponse = getRequestReplicator().replicate(HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
+ } else {
+ clusterResponse = getRequestReplicator().forwardToCoordinator(
+ getClusterCoordinatorNode(), HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
+ }
+
+ if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
return false;
}
- final ControllerServicesEntity controllerServicesEntity = response.getEntity(ControllerServicesEntity.class);
+ final ControllerServicesEntity controllerServicesEntity = getResponseEntity(clusterResponse, ControllerServicesEntity.class);
final Set<ControllerServiceEntity> serviceEntities = controllerServicesEntity.getControllerServices();
+ // update the affected controller services
+ updateAffectedControllerServices(serviceEntities, updateRequest);
+
final String desiredStateName = desiredState.name();
final boolean allServicesMatch = serviceEntities.stream()
.map(entity -> entity.getComponent())
@@ -963,11 +1036,16 @@ public class ProcessGroupResource extends ApplicationResource {
* @param user the user that is retrieving the controller services
* @return <code>true</code> if successful, <code>false</code> if unable to wait for services to reach the desired state
*/
- private boolean waitForLocalControllerServiceStatus(final String groupId, final Set<String> serviceIds, final ControllerServiceState desiredState, final Pause pause, final NiFiUser user) {
+ private boolean waitForLocalControllerServiceStatus(final String groupId, final Set<String> serviceIds, final ControllerServiceState desiredState,
+ final VariableRegistryUpdateRequest updateRequest, final Pause pause, final NiFiUser user) {
+
boolean continuePolling = true;
while (continuePolling) {
final Set<ControllerServiceEntity> serviceEntities = serviceFacade.getControllerServices(groupId, false, true, user);
+ // update the affected controller services
+ updateAffectedControllerServices(serviceEntities, updateRequest);
+
final String desiredStateName = desiredState.name();
final boolean allServicesMatch = serviceEntities.stream()
.map(entity -> entity.getComponent())
@@ -975,6 +1053,7 @@ public class ProcessGroupResource extends ApplicationResource {
.map(service -> service.getState())
.allMatch(state -> desiredStateName.equals(state));
+
if (allServicesMatch) {
logger.debug("All {} controller services of interest now have the desired state of {}", serviceIds.size(), desiredState);
return true;
@@ -987,8 +1066,8 @@ public class ProcessGroupResource extends ApplicationResource {
return false;
}
- private VariableRegistryUpdateRequest createVariableRegistryUpdateRequest(final String groupId) {
- final VariableRegistryUpdateRequest updateRequest = new VariableRegistryUpdateRequest(UUID.randomUUID().toString(), groupId);
+ private VariableRegistryUpdateRequest createVariableRegistryUpdateRequest(final String groupId, final Set<AffectedComponentEntity> affectedComponents, final NiFiUser user) {
+ final VariableRegistryUpdateRequest updateRequest = new VariableRegistryUpdateRequest(UUID.randomUUID().toString(), groupId, affectedComponents, user);
// before adding to the request map, purge any old requests. Must do this by creating a List of ID's
// and then removing those ID's one-at-a-time in order to avoid ConcurrentModificationException.
@@ -1011,27 +1090,26 @@ public class ProcessGroupResource extends ApplicationResource {
return updateRequest;
}
- private Response updateVariableRegistryLocal(final String groupId, final List<AffectedComponentDTO> affectedProcessors, final List<AffectedComponentDTO> affectedServices,
- final VariableRegistryEntity requestEntity) {
+ private Response updateVariableRegistryLocal(final String groupId, final Set<AffectedComponentEntity> affectedComponents, final List<AffectedComponentDTO> affectedProcessors,
+ final List<AffectedComponentDTO> affectedServices, final NiFiUser user, final VariableRegistryEntity requestEntity) {
final Set<String> affectedProcessorIds = affectedProcessors == null ? Collections.emptySet() : affectedProcessors.stream()
- .map(component -> component.getComponentId())
+ .map(component -> component.getId())
.collect(Collectors.toSet());
Map<String, Revision> processorRevisionMap = getRevisions(groupId, affectedProcessorIds);
final Set<String> affectedServiceIds = affectedServices == null ? Collections.emptySet() : affectedServices.stream()
- .map(component -> component.getComponentId())
+ .map(component -> component.getId())
.collect(Collectors.toSet());
Map<String, Revision> serviceRevisionMap = getRevisions(groupId, affectedServiceIds);
// update the variable registry
- final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId);
+ final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId, affectedComponents, user);
updateRequest.getIdentifyRelevantComponentsStep().setComplete(true);
final Pause pause = createPause(updateRequest);
final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId);
- final NiFiUser user = NiFiUserUtils.getNiFiUser();
final Runnable updateTask = new Runnable() {
@Override
public void run() {
@@ -1052,21 +1130,26 @@ public class ProcessGroupResource extends ApplicationResource {
// Apply the updates
performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getApplyUpdatesStep(), "Applying updates to Variable Registry",
- () -> serviceFacade.updateVariableRegistry(user, requestRevision, requestEntity.getVariableRegistry()));
+ () -> {
+ final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(user, requestRevision, requestEntity.getVariableRegistry());
+ updateRequest.setProcessGroupRevision(entity.getProcessGroupRevision());
+ });
// Re-enable the controller services
performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getEnableServicesStep(), "Re-enabling Controller Services",
- () -> enableControllerServices(user, groupId, updatedServiceRevisionMap, pause));
+ () -> enableControllerServices(user, updateRequest, groupId, updatedServiceRevisionMap, pause));
// Restart processors
performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getStartProcessorsStep(), "Restarting Processors",
- () -> startProcessors(user, groupId, updatedProcessorRevisionMap, pause));
+ () -> startProcessors(user, updateRequest, groupId, updatedProcessorRevisionMap, pause));
// Set complete
updateRequest.setComplete(true);
updateRequest.setLastUpdated(new Date());
} catch (final Exception e) {
logger.error("Failed to update Variable Registry for Proces Group with ID " + groupId, e);
+
+ updateRequest.setComplete(true);
updateRequest.setFailureReason("An unexpected error has occurred: " + e);
}
}
@@ -1076,10 +1159,11 @@ public class ProcessGroupResource extends ApplicationResource {
variableRegistryThreadPool.submit(updateTask);
final VariableRegistryUpdateRequestEntity responseEntity = new VariableRegistryUpdateRequestEntity();
- responseEntity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest));
- responseEntity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId()));
+ responseEntity.setRequest(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest));
+ responseEntity.setProcessGroupRevision(updateRequest.getProcessGroupRevision());
+ responseEntity.getRequest().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId()));
- final URI location = URI.create(responseEntity.getRequestDto().getUri());
+ final URI location = URI.create(responseEntity.getRequest().getUri());
return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build();
}
@@ -1103,11 +1187,12 @@ public class ProcessGroupResource extends ApplicationResource {
action.run();
step.setComplete(true);
} catch (final Exception e) {
- request.setComplete(true);
logger.error("Failed to update variable registry for Process Group with ID {}", groupId, e);
step.setComplete(true);
step.setFailureReason(e.getMessage());
+
+ request.setComplete(true);
request.setFailureReason("Failed to update Variable Registry because failed while performing step: " + stepDescription);
}
@@ -1123,21 +1208,21 @@ public class ProcessGroupResource extends ApplicationResource {
serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.STOPPED, processorRevisions.keySet());
serviceFacade.scheduleComponents(user, processGroupId, ScheduledState.STOPPED, processorRevisions);
- waitForLocalProcessorStatus(processGroupId, processorRevisions.keySet(), ScheduledState.STOPPED, pause);
+ waitForLocalProcessor(processGroupId, processorRevisions.keySet(), ScheduledState.STOPPED, updateRequest, pause);
}
- private void startProcessors(final NiFiUser user, final String processGroupId, final Map<String, Revision> processorRevisions, final Pause pause) {
+ private void startProcessors(final NiFiUser user, final VariableRegistryUpdateRequest request, final String processGroupId, final Map<String, Revision> processorRevisions, final Pause pause) {
if (processorRevisions.isEmpty()) {
return;
}
serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.RUNNING, processorRevisions.keySet());
serviceFacade.scheduleComponents(user, processGroupId, ScheduledState.RUNNING, processorRevisions);
- waitForLocalProcessorStatus(processGroupId, processorRevisions.keySet(), ScheduledState.RUNNING, pause);
+ waitForLocalProcessor(processGroupId, processorRevisions.keySet(), ScheduledState.RUNNING, request, pause);
}
private void disableControllerServices(final NiFiUser user, final VariableRegistryUpdateRequest updateRequest, final String processGroupId,
- final Map<String, Revision> serviceRevisions, final Pause pause) {
+ final Map<String, Revision> serviceRevisions, final Pause pause) {
if (serviceRevisions.isEmpty()) {
return;
@@ -1145,116 +1230,141 @@ public class ProcessGroupResource extends ApplicationResource {
serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.DISABLED, serviceRevisions.keySet());
serviceFacade.activateControllerServices(user, processGroupId, ControllerServiceState.DISABLED, serviceRevisions);
- waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.DISABLED, pause, user);
+ waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.DISABLED, updateRequest, pause, user);
}
- private void enableControllerServices(final NiFiUser user, final String processGroupId, final Map<String, Revision> serviceRevisions, final Pause pause) {
+ private void enableControllerServices(final NiFiUser user, final VariableRegistryUpdateRequest updateRequest, final String processGroupId,
+ final Map<String, Revision> serviceRevisions, final Pause pause) {
+
if (serviceRevisions.isEmpty()) {
return;
}
serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.ENABLED, serviceRevisions.keySet());
serviceFacade.activateControllerServices(user, processGroupId, ControllerServiceState.ENABLED, serviceRevisions);
- waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.ENABLED, pause, user);
+ waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.ENABLED, updateRequest, pause, user);
}
- private void scheduleProcessors(final String groupId, final URI originalUri, final Client jerseyClient, final VariableRegistryUpdateRequest updateRequest,
- final Pause pause, final Collection<AffectedComponentDTO> affectedProcessors, final ScheduledState desiredState, final VariableRegistryUpdateStep updateStep) {
+ private void scheduleProcessors(final String groupId, final URI originalUri, final VariableRegistryUpdateRequest updateRequest,
+ final Pause pause, final Collection<AffectedComponentDTO> affectedProcessors, final ScheduledState desiredState,
+ final VariableRegistryUpdateStep updateStep) throws InterruptedException {
+
final Set<String> affectedProcessorIds = affectedProcessors.stream()
- .map(component -> component.getComponentId())
+ .map(component -> component.getId())
.collect(Collectors.toSet());
final Map<String, Revision> processorRevisionMap = getRevisions(groupId, affectedProcessorIds);
final Map<String, RevisionDTO> processorRevisionDtoMap = processorRevisionMap.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue())));
- final ScheduleComponentsEntity stopProcessorsEntity = new ScheduleComponentsEntity();
- stopProcessorsEntity.setComponents(processorRevisionDtoMap);
- stopProcessorsEntity.setId(groupId);
- stopProcessorsEntity.setState(desiredState.name());
+ final ScheduleComponentsEntity scheduleProcessorsEntity = new ScheduleComponentsEntity();
+ scheduleProcessorsEntity.setComponents(processorRevisionDtoMap);
+ scheduleProcessorsEntity.setId(groupId);
+ scheduleProcessorsEntity.setState(desiredState.name());
- URI stopProcessorUri;
+ URI scheduleGroupUri;
try {
- stopProcessorUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
+ scheduleGroupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId, null, originalUri.getFragment());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
- final ClientResponse stopProcessorResponse = jerseyClient.resource(stopProcessorUri)
- .header("Content-Type", "application/json")
- .entity(stopProcessorsEntity)
- .put(ClientResponse.class);
+ final Map<String, String> headers = new HashMap<>();
+ headers.put("content-type", MediaType.APPLICATION_JSON);
+
+ // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
+ final NodeResponse clusterResponse;
+ if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+ clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, scheduleGroupUri, scheduleProcessorsEntity, headers).awaitMergedResponse();
+ } else {
+ clusterResponse = getRequestReplicator().forwardToCoordinator(
+ getClusterCoordinatorNode(), HttpMethod.PUT, scheduleGroupUri, scheduleProcessorsEntity, headers).awaitMergedResponse();
+ }
- final int stopProcessorStatus = stopProcessorResponse.getStatus();
+ final int stopProcessorStatus = clusterResponse.getStatus();
if (stopProcessorStatus != Status.OK.getStatusCode()) {
updateRequest.getStopProcessorsStep().setFailureReason("Failed while " + updateStep.getDescription());
+
+ updateStep.setComplete(true);
updateRequest.setFailureReason("Failed while " + updateStep.getDescription());
return;
}
updateRequest.setLastUpdated(new Date());
- final boolean processorsTransitioned = waitForProcessorStatus(jerseyClient, originalUri, groupId, affectedProcessorIds, desiredState, pause);
- if (processorsTransitioned) {
- updateStep.setComplete(true);
- } else {
+ final boolean processorsTransitioned = waitForProcessorStatus(originalUri, groupId, affectedProcessorIds, desiredState, updateRequest, pause);
+ updateStep.setComplete(true);
+
+ if (!processorsTransitioned) {
updateStep.setFailureReason("Failed while " + updateStep.getDescription());
+
+ updateRequest.setComplete(true);
updateRequest.setFailureReason("Failed while " + updateStep.getDescription());
- return;
}
}
- private void activateControllerServices(final String groupId, final URI originalUri, final Client jerseyClient, final VariableRegistryUpdateRequest updateRequest,
- final Pause pause, final Collection<AffectedComponentDTO> affectedServices, final ControllerServiceState desiredState, final VariableRegistryUpdateStep updateStep) {
+ private void activateControllerServices(final String groupId, final URI originalUri, final VariableRegistryUpdateRequest updateRequest,
+ final Pause pause, final Collection<AffectedComponentDTO> affectedServices, final ControllerServiceState desiredState, final VariableRegistryUpdateStep updateStep) throws InterruptedException {
final Set<String> affectedServiceIds = affectedServices.stream()
- .map(component -> component.getComponentId())
+ .map(component -> component.getId())
.collect(Collectors.toSet());
final Map<String, Revision> serviceRevisionMap = getRevisions(groupId, affectedServiceIds);
final Map<String, RevisionDTO> serviceRevisionDtoMap = serviceRevisionMap.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue())));
- final ActivateControllerServicesEntity disableServicesEntity = new ActivateControllerServicesEntity();
- disableServicesEntity.setComponents(serviceRevisionDtoMap);
- disableServicesEntity.setId(groupId);
- disableServicesEntity.setState(desiredState.name());
+ final ActivateControllerServicesEntity activateServicesEntity = new ActivateControllerServicesEntity();
+ activateServicesEntity.setComponents(serviceRevisionDtoMap);
+ activateServicesEntity.setId(groupId);
+ activateServicesEntity.setState(desiredState.name());
- URI disableServicesUri;
+ URI controllerServicesUri;
try {
- disableServicesUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
+ controllerServicesUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", null, originalUri.getFragment());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
- final ClientResponse disableServicesResponse = jerseyClient.resource(disableServicesUri)
- .header("Content-Type", "application/json")
- .entity(disableServicesEntity)
- .put(ClientResponse.class);
+ final Map<String, String> headers = new HashMap<>();
+ headers.put("content-type", MediaType.APPLICATION_JSON);
- final int disableServicesStatus = disableServicesResponse.getStatus();
+ // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
+ final NodeResponse clusterResponse;
+ if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+ clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse();
+ } else {
+ clusterResponse = getRequestReplicator().forwardToCoordinator(
+ getClusterCoordinatorNode(), HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse();
+ }
+
+ final int disableServicesStatus = clusterResponse.getStatus();
if (disableServicesStatus != Status.OK.getStatusCode()) {
updateStep.setFailureReason("Failed while " + updateStep.getDescription());
+
+ updateStep.setComplete(true);
updateRequest.setFailureReason("Failed while " + updateStep.getDescription());
return;
}
updateRequest.setLastUpdated(new Date());
- if (waitForControllerServiceStatus(jerseyClient, originalUri, groupId, affectedServiceIds, desiredState, pause)) {
- updateStep.setComplete(true);
- } else {
+ final boolean serviceTransitioned = waitForControllerServiceStatus(originalUri, groupId, affectedServiceIds, desiredState, updateRequest, pause);
+ updateStep.setComplete(true);
+
+ if (!serviceTransitioned) {
updateStep.setFailureReason("Failed while " + updateStep.getDescription());
+
+ updateRequest.setComplete(true);
updateRequest.setFailureReason("Failed while " + updateStep.getDescription());
- return;
}
}
+ private void applyVariableRegistryUpdate(final String groupId, final URI originalUri, final VariableRegistryUpdateRequest updateRequest,
+ final VariableRegistryEntity updateEntity) throws InterruptedException, IOException {
- private void applyVariableRegistryUpdate(final String groupId, final URI originalUri, final Client jerseyClient, final VariableRegistryUpdateRequest updateRequest,
- final VariableRegistryEntity updateEntity) {
-
+ // convert request accordingly
URI applyUpdatesUri;
try {
applyUpdatesUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
@@ -1263,21 +1373,53 @@ public class ProcessGroupResource extends ApplicationResource {
throw new RuntimeException(e);
}
- final ClientResponse applyUpdatesResponse = jerseyClient.resource(applyUpdatesUri)
- .header("Content-Type", "application/json")
- .entity(updateEntity)
- .put(ClientResponse.class);
+ final Map<String, String> headers = new HashMap<>();
+ headers.put("content-type", MediaType.APPLICATION_JSON);
- final int applyUpdatesStatus = applyUpdatesResponse.getStatus();
+ // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
+ final NodeResponse clusterResponse;
+ if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
+ clusterResponse = getRequestReplicator().replicate(HttpMethod.PUT, applyUpdatesUri, updateEntity, headers).awaitMergedResponse();
+ } else {
+ clusterResponse = getRequestReplicator().forwardToCoordinator(
+ getClusterCoordinatorNode(), HttpMethod.PUT, applyUpdatesUri, updateEntity, headers).awaitMergedResponse();
+ }
+
+ final int applyUpdatesStatus = clusterResponse.getStatus();
updateRequest.setLastUpdated(new Date());
- if (applyUpdatesStatus != Status.OK.getStatusCode()) {
- updateRequest.getApplyUpdatesStep().setFailureReason("Failed to apply updates to the Variable Registry");
- updateRequest.setFailureReason("Failed to apply updates to the Variable Registry");
- return;
+ updateRequest.getApplyUpdatesStep().setComplete(true);
+
+ if (applyUpdatesStatus == Status.OK.getStatusCode()) {
+ // grab the current process group revision
+ final VariableRegistryEntity entity = getResponseEntity(clusterResponse, VariableRegistryEntity.class);
+ updateRequest.setProcessGroupRevision(entity.getProcessGroupRevision());
+ } else {
+ final String message = getResponseEntity(clusterResponse, String.class);
+
+ // update the request progress
+ updateRequest.getApplyUpdatesStep().setFailureReason("Failed to apply updates to the Variable Registry: " + message);
+ updateRequest.setComplete(true);
+ updateRequest.setFailureReason("Failed to apply updates to the Variable Registry: " + message);
}
}
/**
+ * Extracts the response entity from the specified node response.
+ *
+ * @param nodeResponse node response
+ * @param clazz class
+ * @param <T> type of class
+ * @return the response entity
+ */
+ private <T> T getResponseEntity(final NodeResponse nodeResponse, final Class<T> clazz) {
+ T entity = (T) nodeResponse.getUpdatedEntity();
+ if (entity == null) {
+ entity = nodeResponse.getClientResponse().getEntity(clazz);
+ }
+ return entity;
+ }
+
+ /**
* Removes the specified process group reference.
*
* @param httpServletRequest request
@@ -1676,7 +1818,8 @@ public class ProcessGroupResource extends ApplicationResource {
value = "The process group id.",
required = true
)
- @PathParam("id") final String groupId) {
+ @PathParam("id") final String groupId,
+ @ApiParam("Whether or not to include processors from descendant process groups") @QueryParam("includeDescendantGroups") @DefaultValue("false") boolean includeDescendantGroups) {
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
@@ -1689,7 +1832,7 @@ public class ProcessGroupResource extends ApplicationResource {
});
// get the processors
- final Set<ProcessorEntity> processors = serviceFacade.getProcessors(groupId);
+ final Set<ProcessorEntity> processors = serviceFacade.getProcessors(groupId, includeDescendantGroups);
// create the response entity
final ProcessorsEntity entity = new ProcessorsEntity();
@@ -3121,7 +3264,6 @@ public class ProcessGroupResource extends ApplicationResource {
uriBuilder.segment("process-groups", groupId, "templates", "import");
final URI importUri = uriBuilder.build();
- // change content type to XML for serializing entity
final Map<String, String> headersToOverride = new HashMap<>();
headersToOverride.put("content-type", MediaType.APPLICATION_XML);
http://git-wip-us.apache.org/repos/asf/nifi/blob/eac47e90/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index ed42e9f..1a4d52b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -16,33 +16,6 @@
*/
package org.apache.nifi.web.api.dto;
-import java.text.Collator;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import javax.ws.rs.WebApplicationException;
-
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
@@ -182,6 +155,7 @@ import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO;
import org.apache.nifi.web.api.entity.AccessPolicyEntity;
import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity;
+import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.AllowableValueEntity;
import org.apache.nifi.web.api.entity.BulletinEntity;
import org.apache.nifi.web.api.entity.ComponentReferenceEntity;
@@ -196,6 +170,32 @@ import org.apache.nifi.web.api.entity.VariableEntity;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.revision.RevisionManager;
+import javax.ws.rs.WebApplicationException;
+import java.text.Collator;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
public final class DtoFactory {
@SuppressWarnings("rawtypes")
@@ -1737,13 +1737,29 @@ public final class DtoFactory {
public AffectedComponentDTO createAffectedComponentDto(final ConfiguredComponent component) {
final AffectedComponentDTO dto = new AffectedComponentDTO();
- dto.setComponentId(component.getIdentifier());
- dto.setParentGroupId(component.getProcessGroupIdentifier());
+ dto.setId(component.getIdentifier());
+ dto.setName(component.getName());
+ dto.setProcessGroupId(component.getProcessGroupIdentifier());
if (component instanceof ProcessorNode) {
- dto.setComponentType(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
+ final ProcessorNode node = ((ProcessorNode) component);
+ dto.setState(node.getScheduledState().name());
+ dto.setActiveThreadCount(node.getActiveThreadCount());
+ dto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
} else if (component instanceof ControllerServiceNode) {
- dto.setComponentType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+ final ControllerServiceNode node = ((ControllerServiceNode) component);
+ dto.setState(node.getState().name());
+ dto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+ }
+
+ final Collection<ValidationResult> validationErrors = component.getValidationErrors();
+ if (validationErrors != null && !validationErrors.isEmpty()) {
+ final List<String> errors = new ArrayList<>();
+ for (final ValidationResult validationResult : validationErrors) {
+ errors.add(validationResult.toString());
+ }
+
+ dto.setValidationErrors(errors);
}
return dto;
@@ -2114,8 +2130,18 @@ public final class DtoFactory {
return deprecationNotice == null ? null : deprecationNotice.reason();
}
+ public Set<AffectedComponentEntity> createAffectedComponentEntities(final Set<ConfiguredComponent> affectedComponents, final RevisionManager revisionManager) {
+ return affectedComponents.stream()
+ .map(component -> {
+ final AffectedComponentDTO affectedComponent = createAffectedComponentDto(component);
+ final PermissionsDTO permissions = createPermissionsDto(component);
+ final RevisionDTO revision = createRevisionDTO(revisionManager.getRevision(component.getIdentifier()));
+ return entityFactory.createAffectedComponentEntity(affectedComponent, revision, permissions);
+ })
+ .collect(Collectors.toSet());
+ }
- public VariableRegistryDTO createVariableRegistryDto(final ProcessGroup processGroup) {
+ public VariableRegistryDTO createVariableRegistryDto(final ProcessGroup processGroup, final RevisionManager revisionManager) {
final ComponentVariableRegistry variableRegistry = processGroup.getVariableRegistry();
final List<String> variableNames = variableRegistry.getVariableMap().keySet().stream()
@@ -2130,21 +2156,18 @@ public final class DtoFactory {
variableDto.setValue(variableRegistry.getVariableValue(variableName));
variableDto.setProcessGroupId(processGroup.getIdentifier());
- final Set<ConfiguredComponent> affectedComponents = processGroup.getComponentsAffectedByVariable(variableName);
- final Set<AffectedComponentDTO> affectedComponentDtos = affectedComponents.stream()
- .map(component -> createAffectedComponentDto(component))
- .collect(Collectors.toSet());
+ final Set<AffectedComponentEntity> affectedComponentEntities = createAffectedComponentEntities(processGroup.getComponentsAffectedByVariable(variableName), revisionManager);
boolean canWrite = true;
- for (final ConfiguredComponent component : affectedComponents) {
- final PermissionsDTO permissions = createPermissionsDto(component);
+ for (final AffectedComponentEntity affectedComponent : affectedComponentEntities) {
+ final PermissionsDTO permissions = affectedComponent.getPermissions();
if (!permissions.getCanRead() || !permissions.getCanWrite()) {
canWrite = false;
break;
}
}
- variableDto.setAffectedComponents(affectedComponentDtos);
+ variableDto.setAffectedComponents(affectedComponentEntities);
final VariableEntity variableEntity = new VariableEntity();
variableEntity.setVariable(variableDto);
@@ -2178,6 +2201,8 @@ public final class DtoFactory {
updateSteps.add(createVariableRegistryUpdateStepDto(request.getStartProcessorsStep()));
dto.setUpdateSteps(updateSteps);
+ dto.setAffectedComponents(new HashSet<>(request.getAffectedComponents().values()));
+
return dto;
}
@@ -2190,42 +2215,41 @@ public final class DtoFactory {
}
- public VariableRegistryDTO populateAffectedComponents(final VariableRegistryDTO variableRegistry, final ProcessGroup group) {
+ public VariableRegistryDTO populateAffectedComponents(final VariableRegistryDTO variableRegistry, final ProcessGroup group, final RevisionManager revisionManager) {
if (!group.getIdentifier().equals(variableRegistry.getProcessGroupId())) {
throw new IllegalArgumentException("Variable Registry does not have the same Group ID as the given Process Group");
}
final Set<VariableEntity> variableEntities = new LinkedHashSet<>();
- for (final VariableEntity inputEntity : variableRegistry.getVariables()) {
- final VariableEntity entity = new VariableEntity();
+ if (variableRegistry.getVariables() != null) {
+ for (final VariableEntity inputEntity : variableRegistry.getVariables()) {
+ final VariableEntity entity = new VariableEntity();
- final VariableDTO inputDto = inputEntity.getVariable();
- final VariableDTO variableDto = new VariableDTO();
- variableDto.setName(inputDto.getName());
- variableDto.setValue(inputDto.getValue());
- variableDto.setProcessGroupId(group.getIdentifier());
+ final VariableDTO inputDto = inputEntity.getVariable();
+ final VariableDTO variableDto = new VariableDTO();
+ variableDto.setName(inputDto.getName());
+ variableDto.setValue(inputDto.getValue());
+ variableDto.setProcessGroupId(group.getIdentifier());
- final Set<ConfiguredComponent> affectedComponents = group.getComponentsAffectedByVariable(variableDto.getName());
- final Set<AffectedComponentDTO> affectedComponentDtos = affectedComponents.stream()
- .map(component -> createAffectedComponentDto(component))
- .collect(Collectors.toSet());
+ final Set<AffectedComponentEntity> affectedComponentEntities = createAffectedComponentEntities(group.getComponentsAffectedByVariable(variableDto.getName()), revisionManager);
- boolean canWrite = true;
- for (final ConfiguredComponent component : affectedComponents) {
- final PermissionsDTO permissions = createPermissionsDto(component);
- if (!permissions.getCanRead() || !permissions.getCanWrite()) {
- canWrite = false;
- break;
+ boolean canWrite = true;
+ for (final AffectedComponentEntity affectedComponent : affectedComponentEntities) {
+ final PermissionsDTO permissions = affectedComponent.getPermissions();
+ if (!permissions.getCanRead() || !permissions.getCanWrite()) {
+ canWrite = false;
+ break;
+ }
}
- }
- variableDto.setAffectedComponents(affectedComponentDtos);
+ variableDto.setAffectedComponents(affectedComponentEntities);
- entity.setCanWrite(canWrite);
- entity.setVariable(inputDto);
+ entity.setCanWrite(canWrite);
+ entity.setVariable(inputDto);
- variableEntities.add(entity);
+ variableEntities.add(entity);
+ }
}
final VariableRegistryDTO registryDto = new VariableRegistryDTO();
http://git-wip-us.apache.org/repos/asf/nifi/blob/eac47e90/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
index a7f370a..16781c6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
@@ -33,6 +33,7 @@ import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.api.entity.AccessPolicyEntity;
import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity;
import org.apache.nifi.web.api.entity.ActionEntity;
+import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.AllowableValueEntity;
import org.apache.nifi.web.api.entity.BulletinEntity;
import org.apache.nifi.web.api.entity.ComponentReferenceEntity;
@@ -311,6 +312,20 @@ public final class EntityFactory {
return entity;
}
+ public AffectedComponentEntity createAffectedComponentEntity(final AffectedComponentDTO dto, final RevisionDTO revision, final PermissionsDTO permissions) {
+ final AffectedComponentEntity entity = new AffectedComponentEntity();
+ entity.setRevision(revision);
+ if (dto != null) {
+ entity.setPermissions(permissions);
+ entity.setId(dto.getId());
+
+ if (permissions != null && permissions.getCanRead()) {
+ entity.setComponent(dto);
+ }
+ }
+ return entity;
+ }
+
public UserGroupEntity createUserGroupEntity(final UserGroupDTO dto, final RevisionDTO revision, final PermissionsDTO permissions) {
final UserGroupEntity entity = new UserGroupEntity();
entity.setRevision(revision);
http://git-wip-us.apache.org/repos/asf/nifi/blob/eac47e90/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java
index 1d88161..a1bf170 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessorDAO.java
@@ -59,9 +59,10 @@ public interface ProcessorDAO {
* Gets all the Processor transfer objects for this controller.
*
* @param groupId group id
+ * @param includeDescendants if processors from descendant groups should be included
* @return List of all the Processors
*/
- Set<ProcessorNode> getProcessors(String groupId);
+ Set<ProcessorNode> getProcessors(String groupId, boolean includeDescendants);
/**
* Verifies the specified processor can be updated.
http://git-wip-us.apache.org/repos/asf/nifi/blob/eac47e90/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
index e11f9ad..429592c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
@@ -58,6 +58,7 @@ import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
+import java.util.stream.Collectors;
public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
@@ -307,9 +308,13 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
}
@Override
- public Set<ProcessorNode> getProcessors(String groupId) {
+ public Set<ProcessorNode> getProcessors(String groupId, boolean includeDescendants) {
ProcessGroup group = locateProcessGroup(flowController, groupId);
- return group.getProcessors();
+ if (includeDescendants) {
+ return group.findAllProcessors().stream().collect(Collectors.toSet());
+ } else {
+ return group.getProcessors();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/eac47e90/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml
index 0d60df2..72cd5ed 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml
@@ -461,6 +461,7 @@
<include>${staging.dir}/js/nf/canvas/nf-port-configuration.js</include>
<include>${staging.dir}/js/nf/canvas/nf-port-details.js</include>
<include>${staging.dir}/js/nf/canvas/nf-process-group-configuration.js</include>
+ <include>${staging.dir}/js/nf/canvas/nf-variable-registry.js</include>
<include>${staging.dir}/js/nf/canvas/nf-component-version.js</include>
<include>${staging.dir}/js/nf/canvas/nf-remote-process-group-configuration.js</include>
<include>${staging.dir}/js/nf/canvas/nf-remote-process-group-details.js</include>