You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:14:11 UTC
[23/50] nifi git commit: NIFI-4436: Integrate with actual Flow
Registry via REST Client - Store Bucket Name, Flow Name,
Flow Description for VersionControlInformation - Added endpoint for
determining local modifications to a process group - Updated autho
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
index b010bf3..b808ae6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
@@ -17,15 +17,40 @@
package org.apache.nifi.web.api;
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiParam;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import io.swagger.annotations.Authorization;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.authorization.AuthorizableLookup;
import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.ProcessGroupAuthorizable;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
@@ -34,7 +59,6 @@ import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceState;
-import org.apache.nifi.registry.flow.ComponentType;
import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
@@ -55,9 +79,9 @@ import org.apache.nifi.web.api.dto.VersionedFlowDTO;
import org.apache.nifi.web.api.dto.VersionedFlowUpdateRequestDTO;
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity;
import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
-import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity;
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotEntity;
import org.apache.nifi.web.api.entity.VersionedFlowUpdateRequestEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
@@ -70,37 +94,12 @@ import org.apache.nifi.web.util.Pause;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedHashMap;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
@Path("/versions")
@Api(value = "/versions", description = "Endpoint for managing version control for a flow")
@@ -356,7 +355,10 @@ public class VersionsResource extends ApplicationResource {
response = VersionControlInformationEntity.class,
notes = NON_GUARANTEED_ENDPOINT,
authorizations = {
- @Authorization(value = "Read - /process-groups/{uuid}")
+ @Authorization(value = "Read - /process-groups/{uuid}"),
+ @Authorization(value = "Write - /process-groups/{uuid}"),
+ @Authorization(value = "Read - /{component-type}/{uuid} - For all encapsulated components"),
+ @Authorization(value = "Read - any referenced Controller Services by any encapsulated components - /controller-services/{uuid}")
})
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@@ -500,9 +502,11 @@ public class VersionsResource extends ApplicationResource {
requestEntity,
groupRevision,
lookup -> {
- final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable();
+ final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
+ final Authorizable processGroup = groupAuthorizable.getAuthorizable();
processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+ super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true);
},
() -> {
final VersionControlInformationEntity entity = serviceFacade.getVersionControlInformation(groupId);
@@ -663,22 +667,21 @@ public class VersionsResource extends ApplicationResource {
throw new IllegalArgumentException("The Flow ID must be supplied.");
}
-
// Perform the request
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, requestEntity);
}
- // Determine which components will be affected by updating the version
- final Set<AffectedComponentEntity> affectedComponents = serviceFacade.getComponentsAffectedByVersionChange(groupId, flowSnapshot, NiFiUserUtils.getNiFiUser());
-
final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId);
return withWriteLock(
serviceFacade,
requestEntity,
requestRevision,
lookup -> {
- authorizeAffectedComponents(lookup, affectedComponents);
+ final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
+ final Authorizable processGroup = groupAuthorizable.getAuthorizable();
+ processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+ processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
() -> {
// We do not enforce that the Process Group is 'not dirty' because at this point,
@@ -691,13 +694,16 @@ public class VersionsResource extends ApplicationResource {
// Update the Process Group to match the proposed flow snapshot
final VersionControlInformationDTO versionControlInfoDto = new VersionControlInformationDTO();
versionControlInfoDto.setBucketId(snapshotMetadata.getBucketIdentifier());
+ versionControlInfoDto.setBucketName(snapshotMetadata.getBucketName());
versionControlInfoDto.setCurrent(true);
versionControlInfoDto.setFlowId(snapshotMetadata.getFlowIdentifier());
versionControlInfoDto.setFlowName(snapshotMetadata.getFlowName());
+ versionControlInfoDto.setFlowDescription(snapshotMetadata.getFlowDescription());
versionControlInfoDto.setGroupId(groupId);
versionControlInfoDto.setModified(false);
versionControlInfoDto.setVersion(snapshotMetadata.getVersion());
versionControlInfoDto.setRegistryId(requestEntity.getRegistryId());
+ versionControlInfoDto.setRegistryName(serviceFacade.getFlowRegistryName(requestEntity.getRegistryId()));
final ProcessGroupEntity updatedGroup = serviceFacade.updateProcessGroup(rev, groupId, versionControlInfoDto, flowSnapshot, getIdGenerationSeed().orElse(null), false);
final VersionControlInformationDTO updatedVci = updatedGroup.getComponent().getVersionControlInformation();
@@ -769,6 +775,13 @@ public class VersionsResource extends ApplicationResource {
updateRequestDto.setProcessGroupId(asyncRequest.getProcessGroupId());
updateRequestDto.setRequestId(requestId);
updateRequestDto.setUri(generateResourceUri("versions", requestType, requestId));
+ updateRequestDto.setState(asyncRequest.getState());
+ updateRequestDto.setPercentComplete(asyncRequest.getPercentComplete());
+
+ if (updateRequestDto.isComplete()) {
+ final VersionControlInformationEntity vciEntity = serviceFacade.getVersionControlInformation(asyncRequest.getProcessGroupId());
+ updateRequestDto.setVersionControlInformation(vciEntity == null ? null : vciEntity.getVersionControlInformation());
+ }
final RevisionDTO groupRevision = serviceFacade.getProcessGroup(asyncRequest.getProcessGroupId()).getRevision();
@@ -830,6 +843,13 @@ public class VersionsResource extends ApplicationResource {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final AsynchronousWebRequest<VersionControlInformationEntity> asyncRequest = requestManager.removeRequest(requestType, requestId, user);
+ if (asyncRequest == null) {
+ throw new ResourceNotFoundException("Could not find request of type " + requestType + " with ID " + requestId);
+ }
+
+ if (!asyncRequest.isComplete()) {
+ asyncRequest.cancel();
+ }
final VersionedFlowUpdateRequestDTO updateRequestDto = new VersionedFlowUpdateRequestDTO();
updateRequestDto.setComplete(asyncRequest.isComplete());
@@ -838,6 +858,13 @@ public class VersionsResource extends ApplicationResource {
updateRequestDto.setProcessGroupId(asyncRequest.getProcessGroupId());
updateRequestDto.setRequestId(requestId);
updateRequestDto.setUri(generateResourceUri("versions", requestType, requestId));
+ updateRequestDto.setPercentComplete(asyncRequest.getPercentComplete());
+ updateRequestDto.setState(asyncRequest.getState());
+
+ if (updateRequestDto.isComplete()) {
+ final VersionControlInformationEntity vciEntity = serviceFacade.getVersionControlInformation(asyncRequest.getProcessGroupId());
+ updateRequestDto.setVersionControlInformation(vciEntity == null ? null : vciEntity.getVersionControlInformation());
+ }
final RevisionDTO groupRevision = serviceFacade.getProcessGroup(asyncRequest.getProcessGroupId()).getRevision();
@@ -861,7 +888,10 @@ public class VersionsResource extends ApplicationResource {
notes = NON_GUARANTEED_ENDPOINT,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}"),
- @Authorization(value = "Write - /process-groups/{uuid}")
+ @Authorization(value = "Write - /process-groups/{uuid}"),
+ @Authorization(value = "Read - /{component-type}/{uuid} - For all encapsulated components"),
+ @Authorization(value = "Write - /{component-type}/{uuid} - For all encapsulated components"),
+ @Authorization(value = "Write - if the template contains any restricted components - /restricted-components")
})
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@@ -924,7 +954,7 @@ public class VersionsResource extends ApplicationResource {
// a. Component itself is modified in some way, other than position changing.
// b. Source and Destination of any Connection that is modified.
// c. Any Processor or Controller Service that references a Controller Service that is modified.
- // 2. Verify READ and WRITE permissions for user, for every component affected.
+ // 2. Verify READ and WRITE permissions for user, for every component.
// 3. Verify that all components in the snapshot exist on all nodes (i.e., the NAR exists)?
// 4. Verify that Process Group is already under version control. If not, must start Version Control instead of updateFlow
// 5. Verify that Process Group is not 'dirty'.
@@ -961,8 +991,13 @@ public class VersionsResource extends ApplicationResource {
requestEntity,
requestRevision,
lookup -> {
- // Step 2: Verify READ and WRITE permissions for user, for every component affected.
- authorizeAffectedComponents(lookup, affectedComponents);
+ // Step 2: Verify READ and WRITE permissions for user, for every component.
+ final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
+ final Authorizable processGroup = groupAuthorizable.getAuthorizable();
+ processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+ processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+ super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true);
+ super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.WRITE, true, false, true, true);
final VersionedProcessGroup groupContents = flowSnapshot.getFlowContents();
final boolean containsRestrictedComponents = FlowRegistryUtils.containsRestrictedComponent(groupContents);
@@ -980,7 +1015,7 @@ public class VersionsResource extends ApplicationResource {
// Create an asynchronous request that will occur in the background, because this request may
// result in stopping components, which can take an indeterminate amount of time.
final String requestId = UUID.randomUUID().toString();
- final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user);
+ final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Stopping Processors");
// Submit the request to be performed in the background
final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> {
@@ -1005,6 +1040,8 @@ public class VersionsResource extends ApplicationResource {
updateRequestDto.setProcessGroupId(groupId);
updateRequestDto.setRequestId(requestId);
updateRequestDto.setUri(generateResourceUri("versions", "update-requests", requestId));
+ updateRequestDto.setPercentComplete(request.getPercentComplete());
+ updateRequestDto.setState(request.getState());
final VersionedFlowUpdateRequestEntity updateRequestEntity = new VersionedFlowUpdateRequestEntity();
final RevisionDTO groupRevision = dtoFactory.createRevisionDTO(revision);
@@ -1029,7 +1066,10 @@ public class VersionsResource extends ApplicationResource {
notes = NON_GUARANTEED_ENDPOINT,
authorizations = {
@Authorization(value = "Read - /process-groups/{uuid}"),
- @Authorization(value = "Write - /process-groups/{uuid}")
+ @Authorization(value = "Write - /process-groups/{uuid}"),
+ @Authorization(value = "Read - /{component-type}/{uuid} - For all encapsulated components"),
+ @Authorization(value = "Write - /{component-type}/{uuid} - For all encapsulated components"),
+ @Authorization(value = "Write - if the template contains any restricted components - /restricted-components")
})
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@@ -1095,8 +1135,13 @@ public class VersionsResource extends ApplicationResource {
requestEntity,
requestRevision,
lookup -> {
- // Step 2: Verify READ and WRITE permissions for user, for every component affected.
- authorizeAffectedComponents(lookup, affectedComponents);
+ // Step 2: Verify READ and WRITE permissions for user, for every component.
+ final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId);
+ final Authorizable processGroup = groupAuthorizable.getAuthorizable();
+ processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+ processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+ super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true);
+ super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.WRITE, true, false, true, true);
final VersionedProcessGroup groupContents = flowSnapshot.getFlowContents();
final boolean containsRestrictedComponents = FlowRegistryUtils.containsRestrictedComponent(groupContents);
@@ -1134,7 +1179,7 @@ public class VersionsResource extends ApplicationResource {
// If the information passed in is correct, but there have been no changes, there is nothing to do - just register the request, mark it complete, and return.
if (currentVersion.getModified() == Boolean.FALSE) {
final String requestId = UUID.randomUUID().toString();
- final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user);
+ final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Complete");
requestManager.submitRequest("revert-requests", requestId, request, task -> {
});
@@ -1145,7 +1190,10 @@ public class VersionsResource extends ApplicationResource {
updateRequestDto.setLastUpdated(new Date());
updateRequestDto.setProcessGroupId(groupId);
updateRequestDto.setRequestId(requestId);
+ updateRequestDto.setVersionControlInformation(currentVersion);
updateRequestDto.setUri(generateResourceUri("versions", "revert-requests", requestId));
+ updateRequestDto.setPercentComplete(100);
+ updateRequestDto.setState(request.getState());
final VersionedFlowUpdateRequestEntity updateRequestEntity = new VersionedFlowUpdateRequestEntity();
updateRequestEntity.setProcessGroupRevision(revisionDto);
@@ -1159,19 +1207,18 @@ public class VersionsResource extends ApplicationResource {
// Create an asynchronous request that will occur in the background, because this request may
// result in stopping components, which can take an indeterminate amount of time.
final String requestId = UUID.randomUUID().toString();
- final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user);
+ final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Stopping Processors");
// Submit the request to be performed in the background
final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> {
try {
- // TODO: change the URI to the new endpoint for 'revert' instead of 'change version'
final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri,
affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, false);
vcur.markComplete(updatedVersionControlEntity);
} catch (final LifecycleManagementException e) {
logger.error("Failed to update flow to new version", e);
- vcur.setFailureReason("Failed to update flow to new version due to " + e);
+ vcur.setFailureReason("Failed to update flow to new version due to " + e.getMessage());
}
};
@@ -1201,7 +1248,6 @@ public class VersionsResource extends ApplicationResource {
final boolean verifyNotModified) throws LifecycleManagementException {
// Steps 6-7: Determine which components must be stopped and stop them.
- // Do we need to stop other types? Input Ports, Output Ports, Funnels, RPGs, etc.
final Set<String> stoppableReferenceTypes = new HashSet<>();
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT);
@@ -1215,7 +1261,11 @@ public class VersionsResource extends ApplicationResource {
logger.info("Stopping {} Processors", runningComponents.size());
final Pause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
componentLifecycle.scheduleComponents(exampleUri, user, groupId, runningComponents, ScheduledState.STOPPED, stopComponentsPause);
- asyncRequest.setLastUpdated(new Date());
+
+ if (asyncRequest.isCancelled()) {
+ return null;
+ }
+ asyncRequest.update(new Date(), "Disabling Affected Controller Services", 20);
// Steps 8-9. Disable enabled controller services that are affected
final Set<AffectedComponentEntity> enabledServices = affectedComponents.stream()
@@ -1226,7 +1276,11 @@ public class VersionsResource extends ApplicationResource {
logger.info("Disabling {} Controller Services", enabledServices.size());
final Pause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
componentLifecycle.activateControllerServices(exampleUri, user, groupId, enabledServices, ControllerServiceState.DISABLED, disableServicesPause);
- asyncRequest.setLastUpdated(new Date());
+
+ if (asyncRequest.isCancelled()) {
+ return null;
+ }
+ asyncRequest.update(new Date(), "Updating Flow", 40);
logger.info("Updating Process Group with ID {} to version {} of the Versioned Flow", groupId, flowSnapshot.getSnapshotMetadata().getVersion());
// If replicating request, steps 10-12 are performed on each node individually, and this is accomplished
@@ -1281,21 +1335,32 @@ public class VersionsResource extends ApplicationResource {
serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false);
}
- asyncRequest.setLastUpdated(new Date());
+ if (asyncRequest.isCancelled()) {
+ return null;
+ }
+ asyncRequest.update(new Date(), "Re-Enabling Controller Services", 60);
// Step 13. Re-enable all disabled controller services
final Pause enableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
final Set<AffectedComponentEntity> servicesToEnable = getUpdatedEntities(enabledServices, user);
logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size());
componentLifecycle.activateControllerServices(exampleUri, user, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause);
- asyncRequest.setLastUpdated(new Date());
+
+ if (asyncRequest.isCancelled()) {
+ return null;
+ }
+ asyncRequest.update(new Date(), "Restarting Processors", 80);
// Step 14. Restart all components
final Set<AffectedComponentEntity> componentsToStart = getUpdatedEntities(runningComponents, user);
final Pause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
logger.info("Restarting {} Processors", componentsToStart.size());
componentLifecycle.scheduleComponents(exampleUri, user, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause);
- asyncRequest.setLastUpdated(new Date());
+
+ if (asyncRequest.isCancelled()) {
+ return null;
+ }
+ asyncRequest.update(new Date(), "Complete", 100);
return serviceFacade.getVersionControlInformation(groupId);
}
@@ -1318,42 +1383,6 @@ public class VersionsResource extends ApplicationResource {
}
- private void authorizeAffectedComponents(final AuthorizableLookup lookup, final Set<AffectedComponentEntity> affectedComponents) {
- final Map<String, List<AffectedComponentEntity>> componentsByType = affectedComponents.stream()
- .collect(Collectors.groupingBy(entity -> entity.getComponent().getReferenceType()));
-
- authorize(componentsByType.get(ComponentType.PROCESSOR.name()), id -> lookup.getProcessor(id).getAuthorizable());
- authorize(componentsByType.get(ComponentType.CONTROLLER_SERVICE.name()), id -> lookup.getControllerService(id).getAuthorizable());
-
- authorize(componentsByType.get(ComponentType.CONNECTION.name()), id -> lookup.getConnection(id).getAuthorizable());
- authorize(componentsByType.get(ComponentType.FUNNEL.name()), id -> lookup.getFunnel(id));
- authorize(componentsByType.get(ComponentType.INPUT_PORT.name()), id -> lookup.getInputPort(id));
- authorize(componentsByType.get(ComponentType.OUTPUT_PORT.name()), id -> lookup.getOutputPort(id));
- authorize(componentsByType.get(ComponentType.LABEL.name()), id -> lookup.getLabel(id));
-
- authorize(componentsByType.get(ComponentType.PROCESS_GROUP.name()), id -> lookup.getProcessGroup(id).getAuthorizable());
- authorize(componentsByType.get(ComponentType.REMOTE_PROCESS_GROUP.name()), id -> lookup.getRemoteProcessGroup(id));
-
-
- // Remote Input Ports and Remote Output Ports are not authorized independently but rather at the Remote Process Group level,
- // so we have to treat these a little differently.
- componentsByType.getOrDefault(ComponentType.REMOTE_INPUT_PORT.name(), Collections.emptyList()).stream()
- .forEach(affectedPort -> {
- final String rpgId = affectedPort.getComponent().getProcessGroupId();
- final Authorizable rpg = lookup.getRemoteProcessGroup(rpgId);
- rpg.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
- rpg.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
- });
-
- componentsByType.getOrDefault(ComponentType.REMOTE_OUTPUT_PORT.name(), Collections.emptyList()).stream()
- .forEach(affectedPort -> {
- final String rpgId = affectedPort.getComponent().getProcessGroupId();
- final Authorizable rpg = lookup.getRemoteProcessGroup(rpgId);
- rpg.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
- rpg.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
- });
- }
-
private Set<AffectedComponentEntity> getUpdatedEntities(final Set<AffectedComponentEntity> originalEntities, final NiFiUser user) {
final Set<AffectedComponentEntity> entities = new LinkedHashSet<>();
@@ -1373,17 +1402,6 @@ public class VersionsResource extends ApplicationResource {
}
- private void authorize(final List<AffectedComponentEntity> componentDtos, final Function<String, Authorizable> authFunction) {
- if (componentDtos != null) {
- for (final AffectedComponentEntity entity : componentDtos) {
- final Authorizable authorizable = authFunction.apply(entity.getComponent().getId());
- authorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
- authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
- }
- }
- }
-
-
public void setServiceFacade(NiFiServiceFacade serviceFacade) {
this.serviceFacade = serviceFacade;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java
index 4b87b50..5dcb125 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java
@@ -110,7 +110,6 @@ public class AsyncRequestManager<T> implements RequestManager<T> {
} catch (final Exception e) {
logger.error("Failed to perform asynchronous task", e);
request.setFailureReason("Encountered unexpected error when performing asynchronous task: " + e);
- request.setLastUpdated(new Date());
}
}
});
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
index 2c14008..1309eee 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java
@@ -39,11 +39,23 @@ public interface AsynchronousWebRequest<T> {
Date getLastUpdated();
/**
- * Updates the Date at which the status of this request was last updated
+ * @return the current state of the request
+ */
+ public String getState();
+
+ /**
+ * @return the current percent complete, between 0 and 100 (inclusive)
+ */
+ public int getPercentComplete();
+
+ /**
+ * Updates the request to indicate the new state and percent complete
*
- * @param date the date at which the status of this request was last updated
+ * @param date the last updated time
+ * @param state the new state
+ * @param percentComplete The percentage complete, between 0 and 100 (inclusive)
*/
- void setLastUpdated(Date date);
+ void update(Date date, String state, int percentComplete);
/**
* @return the user who submitted the request
@@ -77,4 +89,14 @@ public interface AsynchronousWebRequest<T> {
* @return the results of the request, if it completed successfully, or <code>null</code> if the request either has no completed or failed
*/
T getResults();
+
+ /**
+ * Cancels the request so that no more steps can be completed
+ */
+ void cancel();
+
+ /**
+ * @return <code>true</code> if the request has been canceled, <code>false</code> otherwise
+ */
+ boolean isCancelled();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
index 8ba9a58..4810a32 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
@@ -29,13 +29,17 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest
private volatile boolean complete = false;
private volatile Date lastUpdated = new Date();
+ private volatile String state;
+ private volatile int percentComplete;
private volatile String failureReason;
+ private volatile boolean cancelled;
private volatile T results;
- public StandardAsynchronousWebRequest(final String requestId, final String processGroupId, final NiFiUser user) {
+ public StandardAsynchronousWebRequest(final String requestId, final String processGroupId, final NiFiUser user, final String state) {
this.id = requestId;
this.processGroupId = processGroupId;
this.user = user;
+ this.state = state;
}
public String getRequestId() {
@@ -57,6 +61,8 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest
this.complete = true;
this.results = results;
this.lastUpdated = new Date();
+ this.percentComplete = 100;
+ this.state = "Complete";
}
@Override
@@ -65,8 +71,34 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest
}
@Override
- public void setLastUpdated(final Date date) {
- this.lastUpdated = lastUpdated;
+ public String getState() {
+ return state;
+ }
+
+ @Override
+ public int getPercentComplete() {
+ return percentComplete;
+ }
+
+ @Override
+ public void update(Date date, String state, int percentComplete) {
+ if (percentComplete < 0 || percentComplete > 100) {
+ throw new IllegalArgumentException("Cannot set percent complete to a value of " + percentComplete + "; it must be between 0 and 100.");
+ }
+
+ if (isCancelled()) {
+ throw new IllegalStateException("Cannot update state because request has already been cancelled by user");
+ }
+
+ if (isComplete()) {
+ final String failure = getFailureReason();
+ final String explanation = failure == null ? "successfully" : "with failure reason: " + failure;
+ throw new IllegalStateException("Cannot update state to '" + state + "' because request is already completed " + explanation);
+ }
+
+ this.lastUpdated = date;
+ this.state = state;
+ this.percentComplete = percentComplete;
}
@Override
@@ -79,6 +111,7 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest
this.failureReason = Objects.requireNonNull(explanation);
this.complete = true;
this.results = null;
+ this.lastUpdated = new Date();
}
@Override
@@ -90,4 +123,17 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest
public T getResults() {
return results;
}
+
+ @Override
+ public void cancel() {
+ this.cancelled = true;
+ percentComplete = 100;
+ state = "Canceled by user";
+ setFailureReason("Request cancelled by user");
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return cancelled;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 3639b18..1c1e729 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -115,8 +115,12 @@ import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedComponent;
+import org.apache.nifi.registry.flow.diff.FlowComparison;
+import org.apache.nifi.registry.flow.diff.FlowDifference;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedConnection;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedFunnel;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedLabel;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedPort;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
@@ -2174,6 +2178,38 @@ public final class DtoFactory {
return dto;
}
+
+ public Set<ComponentDifferenceDTO> createComponentDifferenceDtos(final FlowComparison comparison) {
+ final Map<ComponentDifferenceDTO, List<String>> differencesByComponent = new HashMap<>();
+
+ for (final FlowDifference difference : comparison.getDifferences()) {
+ final ComponentDifferenceDTO componentDiff = createComponentDifference(difference);
+ final List<String> differences = differencesByComponent.computeIfAbsent(componentDiff, key -> new ArrayList<>());
+ differences.add(difference.getDescription());
+ }
+
+ for (final Map.Entry<ComponentDifferenceDTO, List<String>> entry : differencesByComponent.entrySet()) {
+ entry.getKey().setDifferences(entry.getValue());
+ }
+
+ return differencesByComponent.keySet();
+ }
+
+ private ComponentDifferenceDTO createComponentDifference(final FlowDifference difference) {
+ VersionedComponent component = difference.getComponentA();
+ if (component == null) {
+ component = difference.getComponentB();
+ }
+
+ final ComponentDifferenceDTO dto = new ComponentDifferenceDTO();
+ dto.setComponentId(component.getIdentifier());
+ dto.setComponentName(component.getName());
+ dto.setComponentType(component.getComponentType().name());
+ dto.setProcessGroupId(dto.getProcessGroupId());
+ return dto;
+ }
+
+
public VersionControlInformationDTO createVersionControlInformationDto(final ProcessGroup group) {
if (group == null) {
return null;
@@ -2187,10 +2223,12 @@ public final class DtoFactory {
final VersionControlInformationDTO dto = new VersionControlInformationDTO();
dto.setGroupId(group.getIdentifier());
dto.setRegistryId(versionControlInfo.getRegistryIdentifier());
+ dto.setRegistryName(versionControlInfo.getRegistryName());
dto.setBucketId(versionControlInfo.getBucketIdentifier());
+ dto.setBucketName(versionControlInfo.getBucketName());
dto.setFlowId(versionControlInfo.getFlowIdentifier());
- // TODO - need to get flow name here
- dto.setFlowName(group.getName());
+ dto.setFlowName(versionControlInfo.getFlowName());
+ dto.setFlowDescription(versionControlInfo.getFlowDescription());
dto.setVersion(versionControlInfo.getVersion());
dto.setCurrent(versionControlInfo.getCurrent().orElse(null));
dto.setModified(versionControlInfo.getModified().orElse(null));
@@ -2204,6 +2242,9 @@ public final class DtoFactory {
group.getProcessors().stream()
.map(proc -> (InstantiatedVersionedProcessor) proc)
.forEach(proc -> mapping.put(proc.getInstanceId(), proc.getIdentifier()));
+ group.getFunnels().stream()
+ .map(funnel -> (InstantiatedVersionedFunnel) funnel)
+ .forEach(funnel -> mapping.put(funnel.getInstanceId(), funnel.getIdentifier()));
group.getInputPorts().stream()
.map(port -> (InstantiatedVersionedPort) port)
.forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier()));
@@ -2224,13 +2265,17 @@ public final class DtoFactory {
.forEach(rpg -> {
mapping.put(rpg.getInstanceId(), rpg.getIdentifier());
- rpg.getInputPorts().stream()
- .map(port -> (InstantiatedVersionedRemoteGroupPort) port)
- .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier()));
+ if (rpg.getInputPorts() != null) {
+ rpg.getInputPorts().stream()
+ .map(port -> (InstantiatedVersionedRemoteGroupPort) port)
+ .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier()));
+ }
- rpg.getOutputPorts().stream()
- .map(port -> (InstantiatedVersionedRemoteGroupPort) port)
- .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier()));
+ if (rpg.getOutputPorts() != null) {
+ rpg.getOutputPorts().stream()
+ .map(port -> (InstantiatedVersionedRemoteGroupPort) port)
+ .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier()));
+ }
});
group.getProcessGroups().stream()
@@ -3407,9 +3452,12 @@ public final class DtoFactory {
final VersionControlInformationDTO copy = new VersionControlInformationDTO();
copy.setRegistryId(original.getRegistryId());
+ copy.setRegistryName(original.getRegistryName());
copy.setBucketId(original.getBucketId());
+ copy.setBucketName(original.getBucketName());
copy.setFlowId(original.getFlowId());
copy.setFlowName(original.getFlowName());
+ copy.setFlowDescription(original.getFlowDescription());
copy.setVersion(original.getVersion());
copy.setCurrent(original.getCurrent());
copy.setModified(original.getModified());
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/FlowRegistryDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/FlowRegistryDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/FlowRegistryDAO.java
index 19f2de4..4f5af74 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/FlowRegistryDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/FlowRegistryDAO.java
@@ -17,8 +17,14 @@
package org.apache.nifi.web.dao.impl;
+import java.io.IOException;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionedFlow;
@@ -71,37 +77,48 @@ public class FlowRegistryDAO implements RegistryDAO {
throw new IllegalArgumentException("The specified registry id is unknown to this NiFi.");
}
- return flowRegistry.getBuckets(user);
- } catch (final IOException ioe) {
- throw new NiFiCoreException("Unable to obtain bucket listing: " + ioe.getMessage(), ioe);
+ final Set<Bucket> buckets = flowRegistry.getBuckets(user);
+ final Set<Bucket> sortedBuckets = new TreeSet<>((b1, b2) -> b1.getName().compareTo(b2.getName()));
+ sortedBuckets.addAll(buckets);
+ return sortedBuckets;
+ } catch (final IOException | NiFiRegistryException ioe) {
+ throw new NiFiCoreException("Unable to obtain listing of buckets: " + ioe, ioe);
}
}
@Override
public Set<VersionedFlow> getFlowsForUser(String registryId, String bucketId, NiFiUser user) {
- final Set<Bucket> bucketsForUser = getBucketsForUser(registryId, user);
+ try {
+ final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
+ if (flowRegistry == null) {
+ throw new IllegalArgumentException("The specified registry id is unknown to this NiFi.");
+ }
- // TODO - implement getBucket(bucketId, user)
- final Bucket bucket = bucketsForUser.stream().filter(b -> b.getIdentifier().equals(bucketId)).findFirst().orElse(null);
- if (bucket == null) {
- throw new IllegalArgumentException("The specified bucket is not available.");
+ final Set<VersionedFlow> flows = flowRegistry.getFlows(bucketId, user);
+ final Set<VersionedFlow> sortedFlows = new TreeSet<>((f1, f2) -> f1.getName().compareTo(f2.getName()));
+ sortedFlows.addAll(flows);
+ return sortedFlows;
+ } catch (final IOException | NiFiRegistryException ioe) {
+ throw new NiFiCoreException("Unable to obtain listing of flows for bucket with ID " + bucketId + ": " + ioe, ioe);
}
-
- return bucket.getVersionedFlows();
}
@Override
public Set<VersionedFlowSnapshotMetadata> getFlowVersionsForUser(String registryId, String bucketId, String flowId, NiFiUser user) {
- final Set<VersionedFlow> flowsForUser = getFlowsForUser(registryId, bucketId, user);
+ try {
+ final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
+ if (flowRegistry == null) {
+ throw new IllegalArgumentException("The specified registry id is unknown to this NiFi.");
+ }
- // TODO - implement getFlow(bucketId, flowId, user)
- final VersionedFlow versionedFlow = flowsForUser.stream().filter(vf -> vf.getIdentifier().equals(flowId)).findFirst().orElse(null);
- if (versionedFlow == null) {
- throw new IllegalArgumentException("The specified flow is not available.");
+ final Set<VersionedFlowSnapshotMetadata> flowVersions = flowRegistry.getFlowVersions(bucketId, flowId, user);
+ final Set<VersionedFlowSnapshotMetadata> sortedFlowVersions = new TreeSet<>((f1, f2) -> Integer.compare(f1.getVersion(), f2.getVersion()));
+ sortedFlowVersions.addAll(flowVersions);
+ return sortedFlowVersions;
+ } catch (final IOException | NiFiRegistryException ioe) {
+ throw new NiFiCoreException("Unable to obtain listing of versions for bucket with ID " + bucketId + " and flow with ID " + flowId + ": " + ioe, ioe);
}
-
- return versionedFlow.getSnapshotMetadata();
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java
index 35c537d..f830e9b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java
@@ -38,7 +38,11 @@ public class StandardInputPortDAO extends ComponentDAO implements PortDAO {
private Port locatePort(final String portId) {
final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
- final Port port = rootGroup.findInputPort(portId);
+ Port port = rootGroup.findInputPort(portId);
+
+ if (port == null) {
+ port = rootGroup.findOutputPort(portId);
+ }
if (port == null) {
throw new ResourceNotFoundException(String.format("Unable to find port with id '%s'.", portId));
@@ -50,7 +54,7 @@ public class StandardInputPortDAO extends ComponentDAO implements PortDAO {
@Override
public boolean hasPort(String portId) {
final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
- return rootGroup.findInputPort(portId) != null;
+ return rootGroup.findInputPort(portId) != null || rootGroup.findOutputPort(portId) != null;
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index 963220e..78f3e31 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -25,6 +25,7 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
@@ -238,11 +239,15 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
final ProcessGroup group = locateProcessGroup(flowController, groupId);
final String registryId = versionControlInformation.getRegistryId();
- final String bucketId = versionControlInformation.getBucketId();
- final String flowId = versionControlInformation.getFlowId();
- final int version = versionControlInformation.getVersion();
+ final FlowRegistry flowRegistry = flowController.getFlowRegistryClient().getFlowRegistry(registryId);
+ final String registryName = flowRegistry == null ? registryId : flowRegistry.getName();
+
+ final StandardVersionControlInformation vci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation)
+ .registryName(registryName)
+ .modified(false)
+ .current(true)
+ .build();
- final VersionControlInformation vci = new StandardVersionControlInformation(registryId, bucketId, flowId, version, null, false, true);
group.setVersionControlInformation(vci, versionedComponentMapping);
return group;
@@ -261,14 +266,9 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
final ProcessGroup group = locateProcessGroup(flowController, groupId);
group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified, updateSettings);
- final StandardVersionControlInformation svci = new StandardVersionControlInformation(
- versionControlInformation.getRegistryId(),
- versionControlInformation.getBucketId(),
- versionControlInformation.getFlowId(),
- versionControlInformation.getVersion(),
- proposedSnapshot.getFlowContents(),
- versionControlInformation.getModified(),
- versionControlInformation.getCurrent());
+ final StandardVersionControlInformation svci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation)
+ .flowSnapshot(proposedSnapshot.getFlowContents())
+ .build();
group.setVersionControlInformation(svci, Collections.emptyMap());
return group;
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
index a6efb71..1f83a6f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java
@@ -24,9 +24,10 @@ public class CancellableTimedPause implements Pause {
private final long pauseNanos;
private volatile boolean cancelled = false;
- public CancellableTimedPause(final long pauseTime, final long expirationTime, final TimeUnit timeUnit) {
- final long expirationNanos = TimeUnit.NANOSECONDS.convert(expirationTime, timeUnit);
- expirationNanoTime = System.nanoTime() + expirationNanos;
+ public CancellableTimedPause(final long pauseTime, final long expirationPeriod, final TimeUnit timeUnit) {
+ final long expirationNanos = TimeUnit.NANOSECONDS.convert(expirationPeriod, timeUnit);
+ final long expirationTime = System.nanoTime() + expirationNanos;
+ expirationNanoTime = expirationTime < 0 ? Long.MAX_VALUE : expirationTime;
pauseNanos = Math.max(1L, TimeUnit.NANOSECONDS.convert(pauseTime, timeUnit));
}
@@ -44,7 +45,7 @@ public class CancellableTimedPause implements Pause {
final long maxWaitTime = System.nanoTime() + pauseNanos;
while (sysTime < maxWaitTime) {
try {
- TimeUnit.NANOSECONDS.wait(pauseNanos);
+ TimeUnit.NANOSECONDS.sleep(pauseNanos);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
return false;
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 490862c..584cac6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1641,6 +1641,12 @@
<version>${nifi.registry.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.nifi.registry</groupId>
+ <artifactId>nifi-registry-client</artifactId>
+ <version>${nifi.registry.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.0.0</version>