You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2018/01/04 18:05:35 UTC

nifi git commit: NIFI-4733: - Resolving logic issue in two phase commit when updating variable registry. This closes #2370

Repository: nifi
Updated Branches:
  refs/heads/master f7f001eb9 -> 7a8dbb8b1


NIFI-4733:
- Resolving logic issue in two phase commit when updating variable registry. This closes #2370


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7a8dbb8b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7a8dbb8b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7a8dbb8b

Branch: refs/heads/master
Commit: 7a8dbb8b1512d7bd7a26e1b996a7ef859ccfcd4e
Parents: f7f001e
Author: Matt Gilman <ma...@gmail.com>
Authored: Wed Jan 3 11:00:22 2018 -0500
Committer: Matt Gilman <ma...@gmail.com>
Committed: Thu Jan 4 13:03:11 2018 -0500

----------------------------------------------------------------------
 .../nifi/web/api/ProcessGroupResource.java      | 87 ++++++++++++++------
 1 file changed, 62 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7a8dbb8b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 7ff7885..b866677 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -74,6 +74,7 @@ import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.ControllerServicesEntity;
 import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
 import org.apache.nifi.web.api.entity.CreateTemplateRequestEntity;
+import org.apache.nifi.web.api.entity.Entity;
 import org.apache.nifi.web.api.entity.FlowEntity;
 import org.apache.nifi.web.api.entity.FunnelEntity;
 import org.apache.nifi.web.api.entity.FunnelsEntity;
@@ -552,41 +553,43 @@ public class ProcessGroupResource extends ApplicationResource {
     public Response updateVariableRegistry(
         @Context final HttpServletRequest httpServletRequest,
         @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId,
-        @ApiParam(value = "The process group configuration details.", required = true) final VariableRegistryEntity requestEntity) {
+        @ApiParam(value = "The variable registry configuration details.", required = true) final VariableRegistryEntity requestVariableRegistryEntity) {
 
-        if (requestEntity == null || requestEntity.getVariableRegistry() == null) {
+        if (requestVariableRegistryEntity == null || requestVariableRegistryEntity.getVariableRegistry() == null) {
             throw new IllegalArgumentException("Variable Registry details must be specified.");
         }
 
-        if (requestEntity.getProcessGroupRevision() == null) {
+        if (requestVariableRegistryEntity.getProcessGroupRevision() == null) {
             throw new IllegalArgumentException("Process Group Revision must be specified.");
         }
 
         // ensure the same id is being used
-        final VariableRegistryDTO registryDto = requestEntity.getVariableRegistry();
-        if (!groupId.equals(registryDto.getProcessGroupId())) {
+        final VariableRegistryDTO requestRegistryDto = requestVariableRegistryEntity.getVariableRegistry();
+        if (!groupId.equals(requestRegistryDto.getProcessGroupId())) {
             throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does "
-                + "not equal the process group id of the requested resource (%s).", registryDto.getProcessGroupId(), groupId));
+                + "not equal the process group id of the requested resource (%s).", requestRegistryDto.getProcessGroupId(), groupId));
         }
 
         if (isReplicateRequest()) {
-            return replicate(HttpMethod.PUT, requestEntity);
+            return replicate(HttpMethod.PUT, requestVariableRegistryEntity);
         }
 
         // handle expects request (usually from the cluster manager)
-        final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId);
+        final Revision requestRevision = getRevision(requestVariableRegistryEntity.getProcessGroupRevision(), groupId);
         return withWriteLock(
             serviceFacade,
-            requestEntity,
+            requestVariableRegistryEntity,
             requestRevision,
             lookup -> {
                 Authorizable authorizable = lookup.getProcessGroup(groupId).getAuthorizable();
                 authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
             },
             null,
-            (revision, processGroupEntity) -> {
+            (revision, variableRegistryEntity) -> {
+                final VariableRegistryDTO variableRegistry = variableRegistryEntity.getVariableRegistry();
+
                 // update the process group
-                final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(revision, registryDto);
+                final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(revision, variableRegistry);
                 return generateOkResponse(entity).build();
             });
     }
@@ -597,7 +600,7 @@ public class ProcessGroupResource extends ApplicationResource {
      *
      * @param httpServletRequest request
      * @param groupId The id of the process group.
-     * @param requestEntity the Variable Registry Entity
+     * @param requestVariableRegistryEntity the Variable Registry Entity
      * @return A Variable Registry Entry.
      */
     @POST
@@ -620,13 +623,13 @@ public class ProcessGroupResource extends ApplicationResource {
     public Response submitUpdateVariableRegistryRequest(
         @Context final HttpServletRequest httpServletRequest,
         @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId,
-        @ApiParam(value = "The process group configuration details.", required = true) final VariableRegistryEntity requestEntity) {
+        @ApiParam(value = "The variable registry configuration details.", required = true) final VariableRegistryEntity requestVariableRegistryEntity) {
 
-        if (requestEntity == null || requestEntity.getVariableRegistry() == null) {
+        if (requestVariableRegistryEntity == null || requestVariableRegistryEntity.getVariableRegistry() == null) {
             throw new IllegalArgumentException("Variable Registry details must be specified.");
         }
 
-        if (requestEntity.getProcessGroupRevision() == null) {
+        if (requestVariableRegistryEntity.getProcessGroupRevision() == null) {
             throw new IllegalArgumentException("Process Group Revision must be specified.");
         }
 
@@ -641,14 +644,14 @@ public class ProcessGroupResource extends ApplicationResource {
         // 6. Re-Enable all previously Active Processors that Depended on the Controller Services
 
         // Determine the affected components (and their associated revisions)
-        final VariableRegistryEntity computedEntity = serviceFacade.populateAffectedComponents(requestEntity.getVariableRegistry());
+        final VariableRegistryEntity computedEntity = serviceFacade.populateAffectedComponents(requestVariableRegistryEntity.getVariableRegistry());
         final VariableRegistryDTO computedRegistryDto = computedEntity.getVariableRegistry();
         if (computedRegistryDto == null) {
             throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
         }
 
-        final Set<AffectedComponentEntity> allAffectedComponents = serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry());
-        final Set<AffectedComponentDTO> activeAffectedComponents = serviceFacade.getActiveComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry());
+        final Set<AffectedComponentEntity> allAffectedComponents = serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestVariableRegistryEntity.getVariableRegistry());
+        final Set<AffectedComponentDTO> activeAffectedComponents = serviceFacade.getActiveComponentsAffectedByVariableRegistryUpdate(requestVariableRegistryEntity.getVariableRegistry());
 
         final Map<String, List<AffectedComponentDTO>> activeAffectedComponentsByType = activeAffectedComponents.stream()
             .collect(Collectors.groupingBy(comp -> comp.getReferenceType()));
@@ -698,7 +701,7 @@ public class ProcessGroupResource extends ApplicationResource {
                     final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(user));
                     SecurityContextHolder.getContext().setAuthentication(authentication);
 
-                    updateVariableRegistryReplicated(groupId, originalUri, activeAffectedProcessors, activeAffectedServices, updateRequest, requestEntity);
+                    updateVariableRegistryReplicated(groupId, originalUri, activeAffectedProcessors, activeAffectedServices, updateRequest, requestVariableRegistryEntity);
                 } catch (final Exception e) {
                     logger.error("Failed to update variable registry", e);
 
@@ -721,15 +724,19 @@ public class ProcessGroupResource extends ApplicationResource {
             return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build();
         }
 
+        final UpdateVariableRegistryRequestWrapper requestWrapper =
+                new UpdateVariableRegistryRequestWrapper(allAffectedComponents, activeAffectedProcessors, activeAffectedServices, requestVariableRegistryEntity);
 
-        final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId);
+        final Revision requestRevision = getRevision(requestVariableRegistryEntity.getProcessGroupRevision(), groupId);
         return withWriteLock(
             serviceFacade,
-            requestEntity,
+            requestWrapper,
             requestRevision,
             authorizeAccess,
             null,
-            (revision, varRegistryEntity) -> updateVariableRegistryLocal(groupId, allAffectedComponents, activeAffectedProcessors, activeAffectedServices, user, requestEntity)
+            (revision, wrapper) ->
+                    updateVariableRegistryLocal(groupId, wrapper.getAllAffectedComponents(), wrapper.getActiveAffectedProcessors(),
+                        wrapper.getActiveAffectedServices(), user, revision, wrapper.getVariableRegistryEntity())
         );
     }
 
@@ -1088,7 +1095,7 @@ public class ProcessGroupResource extends ApplicationResource {
     }
 
     private Response updateVariableRegistryLocal(final String groupId, final Set<AffectedComponentEntity> affectedComponents, final List<AffectedComponentDTO> affectedProcessors,
-                                                 final List<AffectedComponentDTO> affectedServices, final NiFiUser user, final VariableRegistryEntity requestEntity) {
+                                                 final List<AffectedComponentDTO> affectedServices, final NiFiUser user, final Revision requestRevision, final VariableRegistryEntity requestEntity) {
 
         final Set<String> affectedProcessorIds = affectedProcessors == null ? Collections.emptySet() : affectedProcessors.stream()
             .map(component -> component.getId())
@@ -1105,8 +1112,6 @@ public class ProcessGroupResource extends ApplicationResource {
         updateRequest.getIdentifyRelevantComponentsStep().setComplete(true);
         final Pause pause = createPause(updateRequest);
 
-        final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId);
-
         final Runnable updateTask = new Runnable() {
             @Override
             public void run() {
@@ -3474,6 +3479,38 @@ public class ProcessGroupResource extends ApplicationResource {
         );
     }
 
+    private static class UpdateVariableRegistryRequestWrapper extends Entity {
+        private final Set<AffectedComponentEntity> allAffectedComponents;
+        private final List<AffectedComponentDTO> activeAffectedProcessors;
+        private final List<AffectedComponentDTO> activeAffectedServices;
+        private final VariableRegistryEntity variableRegistryEntity;
+
+        public UpdateVariableRegistryRequestWrapper(final Set<AffectedComponentEntity> allAffectedComponents, final List<AffectedComponentDTO> activeAffectedProcessors,
+                                                    final List<AffectedComponentDTO> activeAffectedServices, VariableRegistryEntity variableRegistryEntity) {
+
+            this.allAffectedComponents = allAffectedComponents;
+            this.activeAffectedProcessors = activeAffectedProcessors;
+            this.activeAffectedServices = activeAffectedServices;
+            this.variableRegistryEntity = variableRegistryEntity;
+        }
+
+        public Set<AffectedComponentEntity> getAllAffectedComponents() {
+            return allAffectedComponents;
+        }
+
+        public List<AffectedComponentDTO> getActiveAffectedProcessors() {
+            return activeAffectedProcessors;
+        }
+
+        public List<AffectedComponentDTO> getActiveAffectedServices() {
+            return activeAffectedServices;
+        }
+
+        public VariableRegistryEntity getVariableRegistryEntity() {
+            return variableRegistryEntity;
+        }
+    }
+
     // setters
 
     public void setServiceFacade(NiFiServiceFacade serviceFacade) {