You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2018/01/04 18:05:35 UTC
nifi git commit: NIFI-4733: - Resolving logic issue in two phase
commit when updating variable registry. This closes #2370
Repository: nifi
Updated Branches:
refs/heads/master f7f001eb9 -> 7a8dbb8b1
NIFI-4733:
- Resolving logic issue in two phase commit when updating variable registry. This closes #2370
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7a8dbb8b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7a8dbb8b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7a8dbb8b
Branch: refs/heads/master
Commit: 7a8dbb8b1512d7bd7a26e1b996a7ef859ccfcd4e
Parents: f7f001e
Author: Matt Gilman <ma...@gmail.com>
Authored: Wed Jan 3 11:00:22 2018 -0500
Committer: Matt Gilman <ma...@gmail.com>
Committed: Thu Jan 4 13:03:11 2018 -0500
----------------------------------------------------------------------
.../nifi/web/api/ProcessGroupResource.java | 87 ++++++++++++++------
1 file changed, 62 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a8dbb8b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 7ff7885..b866677 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -74,6 +74,7 @@ import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
import org.apache.nifi.web.api.entity.CreateTemplateRequestEntity;
+import org.apache.nifi.web.api.entity.Entity;
import org.apache.nifi.web.api.entity.FlowEntity;
import org.apache.nifi.web.api.entity.FunnelEntity;
import org.apache.nifi.web.api.entity.FunnelsEntity;
@@ -552,41 +553,43 @@ public class ProcessGroupResource extends ApplicationResource {
public Response updateVariableRegistry(
@Context final HttpServletRequest httpServletRequest,
@ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId,
- @ApiParam(value = "The process group configuration details.", required = true) final VariableRegistryEntity requestEntity) {
+ @ApiParam(value = "The variable registry configuration details.", required = true) final VariableRegistryEntity requestVariableRegistryEntity) {
- if (requestEntity == null || requestEntity.getVariableRegistry() == null) {
+ if (requestVariableRegistryEntity == null || requestVariableRegistryEntity.getVariableRegistry() == null) {
throw new IllegalArgumentException("Variable Registry details must be specified.");
}
- if (requestEntity.getProcessGroupRevision() == null) {
+ if (requestVariableRegistryEntity.getProcessGroupRevision() == null) {
throw new IllegalArgumentException("Process Group Revision must be specified.");
}
// ensure the same id is being used
- final VariableRegistryDTO registryDto = requestEntity.getVariableRegistry();
- if (!groupId.equals(registryDto.getProcessGroupId())) {
+ final VariableRegistryDTO requestRegistryDto = requestVariableRegistryEntity.getVariableRegistry();
+ if (!groupId.equals(requestRegistryDto.getProcessGroupId())) {
throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does "
- + "not equal the process group id of the requested resource (%s).", registryDto.getProcessGroupId(), groupId));
+ + "not equal the process group id of the requested resource (%s).", requestRegistryDto.getProcessGroupId(), groupId));
}
if (isReplicateRequest()) {
- return replicate(HttpMethod.PUT, requestEntity);
+ return replicate(HttpMethod.PUT, requestVariableRegistryEntity);
}
// handle expects request (usually from the cluster manager)
- final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId);
+ final Revision requestRevision = getRevision(requestVariableRegistryEntity.getProcessGroupRevision(), groupId);
return withWriteLock(
serviceFacade,
- requestEntity,
+ requestVariableRegistryEntity,
requestRevision,
lookup -> {
Authorizable authorizable = lookup.getProcessGroup(groupId).getAuthorizable();
authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
null,
- (revision, processGroupEntity) -> {
+ (revision, variableRegistryEntity) -> {
+ final VariableRegistryDTO variableRegistry = variableRegistryEntity.getVariableRegistry();
+
// update the process group
- final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(revision, registryDto);
+ final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(revision, variableRegistry);
return generateOkResponse(entity).build();
});
}
@@ -597,7 +600,7 @@ public class ProcessGroupResource extends ApplicationResource {
*
* @param httpServletRequest request
* @param groupId The id of the process group.
- * @param requestEntity the Variable Registry Entity
+ * @param requestVariableRegistryEntity the Variable Registry Entity
* @return A Variable Registry Entry.
*/
@POST
@@ -620,13 +623,13 @@ public class ProcessGroupResource extends ApplicationResource {
public Response submitUpdateVariableRegistryRequest(
@Context final HttpServletRequest httpServletRequest,
@ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId,
- @ApiParam(value = "The process group configuration details.", required = true) final VariableRegistryEntity requestEntity) {
+ @ApiParam(value = "The variable registry configuration details.", required = true) final VariableRegistryEntity requestVariableRegistryEntity) {
- if (requestEntity == null || requestEntity.getVariableRegistry() == null) {
+ if (requestVariableRegistryEntity == null || requestVariableRegistryEntity.getVariableRegistry() == null) {
throw new IllegalArgumentException("Variable Registry details must be specified.");
}
- if (requestEntity.getProcessGroupRevision() == null) {
+ if (requestVariableRegistryEntity.getProcessGroupRevision() == null) {
throw new IllegalArgumentException("Process Group Revision must be specified.");
}
@@ -641,14 +644,14 @@ public class ProcessGroupResource extends ApplicationResource {
// 6. Re-Enable all previously Active Processors that Depended on the Controller Services
// Determine the affected components (and their associated revisions)
- final VariableRegistryEntity computedEntity = serviceFacade.populateAffectedComponents(requestEntity.getVariableRegistry());
+ final VariableRegistryEntity computedEntity = serviceFacade.populateAffectedComponents(requestVariableRegistryEntity.getVariableRegistry());
final VariableRegistryDTO computedRegistryDto = computedEntity.getVariableRegistry();
if (computedRegistryDto == null) {
throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
}
- final Set<AffectedComponentEntity> allAffectedComponents = serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry());
- final Set<AffectedComponentDTO> activeAffectedComponents = serviceFacade.getActiveComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry());
+ final Set<AffectedComponentEntity> allAffectedComponents = serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestVariableRegistryEntity.getVariableRegistry());
+ final Set<AffectedComponentDTO> activeAffectedComponents = serviceFacade.getActiveComponentsAffectedByVariableRegistryUpdate(requestVariableRegistryEntity.getVariableRegistry());
final Map<String, List<AffectedComponentDTO>> activeAffectedComponentsByType = activeAffectedComponents.stream()
.collect(Collectors.groupingBy(comp -> comp.getReferenceType()));
@@ -698,7 +701,7 @@ public class ProcessGroupResource extends ApplicationResource {
final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(user));
SecurityContextHolder.getContext().setAuthentication(authentication);
- updateVariableRegistryReplicated(groupId, originalUri, activeAffectedProcessors, activeAffectedServices, updateRequest, requestEntity);
+ updateVariableRegistryReplicated(groupId, originalUri, activeAffectedProcessors, activeAffectedServices, updateRequest, requestVariableRegistryEntity);
} catch (final Exception e) {
logger.error("Failed to update variable registry", e);
@@ -721,15 +724,19 @@ public class ProcessGroupResource extends ApplicationResource {
return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build();
}
+ final UpdateVariableRegistryRequestWrapper requestWrapper =
+ new UpdateVariableRegistryRequestWrapper(allAffectedComponents, activeAffectedProcessors, activeAffectedServices, requestVariableRegistryEntity);
- final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId);
+ final Revision requestRevision = getRevision(requestVariableRegistryEntity.getProcessGroupRevision(), groupId);
return withWriteLock(
serviceFacade,
- requestEntity,
+ requestWrapper,
requestRevision,
authorizeAccess,
null,
- (revision, varRegistryEntity) -> updateVariableRegistryLocal(groupId, allAffectedComponents, activeAffectedProcessors, activeAffectedServices, user, requestEntity)
+ (revision, wrapper) ->
+ updateVariableRegistryLocal(groupId, wrapper.getAllAffectedComponents(), wrapper.getActiveAffectedProcessors(),
+ wrapper.getActiveAffectedServices(), user, revision, wrapper.getVariableRegistryEntity())
);
}
@@ -1088,7 +1095,7 @@ public class ProcessGroupResource extends ApplicationResource {
}
private Response updateVariableRegistryLocal(final String groupId, final Set<AffectedComponentEntity> affectedComponents, final List<AffectedComponentDTO> affectedProcessors,
- final List<AffectedComponentDTO> affectedServices, final NiFiUser user, final VariableRegistryEntity requestEntity) {
+ final List<AffectedComponentDTO> affectedServices, final NiFiUser user, final Revision requestRevision, final VariableRegistryEntity requestEntity) {
final Set<String> affectedProcessorIds = affectedProcessors == null ? Collections.emptySet() : affectedProcessors.stream()
.map(component -> component.getId())
@@ -1105,8 +1112,6 @@ public class ProcessGroupResource extends ApplicationResource {
updateRequest.getIdentifyRelevantComponentsStep().setComplete(true);
final Pause pause = createPause(updateRequest);
- final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId);
-
final Runnable updateTask = new Runnable() {
@Override
public void run() {
@@ -3474,6 +3479,38 @@ public class ProcessGroupResource extends ApplicationResource {
);
}
+ private static class UpdateVariableRegistryRequestWrapper extends Entity {
+ private final Set<AffectedComponentEntity> allAffectedComponents;
+ private final List<AffectedComponentDTO> activeAffectedProcessors;
+ private final List<AffectedComponentDTO> activeAffectedServices;
+ private final VariableRegistryEntity variableRegistryEntity;
+
+ public UpdateVariableRegistryRequestWrapper(final Set<AffectedComponentEntity> allAffectedComponents, final List<AffectedComponentDTO> activeAffectedProcessors,
+ final List<AffectedComponentDTO> activeAffectedServices, VariableRegistryEntity variableRegistryEntity) {
+
+ this.allAffectedComponents = allAffectedComponents;
+ this.activeAffectedProcessors = activeAffectedProcessors;
+ this.activeAffectedServices = activeAffectedServices;
+ this.variableRegistryEntity = variableRegistryEntity;
+ }
+
+ public Set<AffectedComponentEntity> getAllAffectedComponents() {
+ return allAffectedComponents;
+ }
+
+ public List<AffectedComponentDTO> getActiveAffectedProcessors() {
+ return activeAffectedProcessors;
+ }
+
+ public List<AffectedComponentDTO> getActiveAffectedServices() {
+ return activeAffectedServices;
+ }
+
+ public VariableRegistryEntity getVariableRegistryEntity() {
+ return variableRegistryEntity;
+ }
+ }
+
// setters
public void setServiceFacade(NiFiServiceFacade serviceFacade) {