You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/07/12 21:00:10 UTC
[7/9] nifi git commit: NIFI-2095: - Adding a page for managing users
and groups. - Adding a page for managing access policies. - Renaming
accessPolicy in entity to permissions to avoid confusion with the
accessPolicy model. - Adding an Authorizable for a
http://git-wip-us.apache.org/repos/asf/nifi/blob/e0c96794/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/AccessPolicyResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/AccessPolicyResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/AccessPolicyResource.java
index dd537cd..2344a5c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/AccessPolicyResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/AccessPolicyResource.java
@@ -98,6 +98,74 @@ public class AccessPolicyResource extends ApplicationResource {
return accessPolicy;
}
+ // -----------------
+ // get access policy
+ // -----------------
+
+ /**
+ * Retrieves the specified access policy.
+ *
+ * @return An accessPolicyEntity.
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{action}/{resource: .+}")
+ // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+ @ApiOperation(
+ value = "Gets an access policy",
+ response = AccessPolicyEntity.class,
+ authorizations = {
+ @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+ @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response getAccessPolicyForResource(
+ @ApiParam(
+ value = "The request action.",
+ allowableValues = "read, write",
+ required = true
+ ) @PathParam("action") final String action,
+ @ApiParam(
+ value = "The resource of the policy.",
+ required = true
+ ) @PathParam("resource") String rawResource) {
+
+ // parse the action and resource type
+ final RequestAction requestAction = RequestAction.valueOfValue(action);
+ final String resource = "/" + rawResource;
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.GET);
+ }
+
+ // authorize access
+ serviceFacade.authorizeAccess(lookup -> {
+ final Authorizable accessPolicy = lookup.getAccessPolicyByResource(resource);
+ accessPolicy.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+ });
+
+ // get the access policy
+ final AccessPolicyEntity entity = serviceFacade.getAccessPolicy(requestAction, resource);
+ populateRemainingAccessPolicyEntityContent(entity);
+
+ return clusterContext(generateOkResponse(entity)).build();
+ }
+
+ // -----------------------
+ // manage an access policy
+ // -----------------------
+
/**
* Creates a new access policy.
*
@@ -111,58 +179,6 @@ public class AccessPolicyResource extends ApplicationResource {
// TODO - @PreAuthorize("hasRole('ROLE_DFM')")
@ApiOperation(
value = "Creates an access policy",
- notes = " Available resources:\n" +
- " /flow - READ - allows user/entity to load the UI and see the flow structure\n" +
- " - WRITE - NA\n" +
- " /resource - READ - allows user/entity to retrieve the available resources\n" +
- " - WRITE - NA\n" +
- " /system - READ - allows user/entity to retrieve system level diagnostics (CPU load, disk utilization, etc)\n" +
- " - WRITE - NA\n" +
- " /controller - READ - allows user/entity to retrieve configuration details for the controller (controller bulletins, thread pool, reporting tasks, etc)\n" +
- " - WRITE - allows user/entity to modify configuration details for the controller\n" +
- " /provenance - READ - allows user/entity to perform provenance requests. results will be filtered based on access to provenance data per component\n" +
- " - WRITE - NA\n" +
- " /token - READ - NA\n" +
- " - WRITE - allows user/entity to create a token for access the REST API\n" +
- " /site-to-site - READ - allows user/entity to retrieve configuration details for performing site to site data transfers with this NiFi\n" +
- " - WRITE - NA\n" +
- " /proxy - READ - NA\n" +
- " - WRITE - allows user/entity to create a proxy request on behalf of another user\n" +
- " /process-groups/{id} - READ - allows user/entity to retrieve configuration details for the process group and all descendant components without explicit " +
- "access policies\n" +
- " - WRITE - allows user/entity to create/update/delete configuration details for the process group and all descendant components without " +
- "explicit access policies\n" +
- " /processors/{id} - READ - allows user/entity to retrieve configuration details for the processor overriding any inherited authorizations from an ancestor " +
- "process group\n" +
- " - WRITE - allows user/entity to update/delete the processor overriding any inherited authorizations from an ancestor process group\n" +
- " /input-ports/{id} - READ - allows user/entity to retrieve configuration details for the input port overriding any inherited authorizations from an ancestor " +
- "process group\n" +
- " - WRITE - allows user/entity to update/delete the input port overriding any inherited authorizations from an ancestor process group\n" +
- " /output-ports/{id} - READ - allows user/entity to retrieve configuration details for the output port overriding any inherited authorizations from an ancestor " +
- "process group\n" +
- " - WRITE - allows user/entity to update/delete the output port overriding any inherited authorizations from an ancestor process group\n" +
- " /labels/{id} - READ - allows user/entity to retrieve configuration details for the label overriding any inherited authorizations from an ancestor " +
- "process group\n" +
- " - WRITE - allows user/entity to update/delete the label overriding any inherited authorizations from an ancestor process group\n" +
- " /connections/{id} - READ - allows user/entity to retrieve configuration details for the connection overriding any inherited authorizations from an ancestor " +
- "process group\n" +
- " - WRITE - allows user/entity to update/delete the label overriding any inherited authorizations from an ancestor process group\n" +
- " /remote-process-groups/{id} - READ - allows user/entity to retrieve configuration details for the remote process group overriding any inherited authorizations from an " +
- "ancestor process group\n" +
- " - WRITE - allows user/entity to update/delete the remote process group overriding any inherited authorizations from an ancestor process " +
- "group\n" +
- " /templates/{id} - READ - allows user/entity to retrieve configuration details for the template overriding any inherited authorizations from an ancestor " +
- "process group\n" +
- " - WRITE - allows user/entity to create/update/delete the template overriding any inherited authorizations from an ancestor process group\n" +
- " /controller-services/{id} - READ - allows user/entity to retrieve configuration details for the controller service overriding any inherited authorizations from an " +
- "ancestor process group\n" +
- " - WRITE - allows user/entity to update/delete the controller service overriding any inherited authorizations from an ancestor process " +
- "group\n" +
- " /reporting-tasks/{id} - READ - allows user/entity to retrieve configuration details for the reporting tasks overriding any inherited authorizations from the " +
- "controller\n" +
- " - WRITE - allows user/entity to create/update/delete the reporting tasks overriding any inherited authorizations from the controller\n" +
- " /{type}/{id}/provenance - READ - allows user/entity to view provenance data from the underlying component\n" +
- " - WRITE - NA\n",
response = AccessPolicyEntity.class,
authorizations = {
@Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
@@ -192,10 +208,18 @@ public class AccessPolicyResource extends ApplicationResource {
throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Policy.");
}
- if (accessPolicyEntity.getComponent().getId() != null) {
+ final AccessPolicyDTO requestAccessPolicy = accessPolicyEntity.getComponent();
+ if (requestAccessPolicy.getId() != null) {
throw new IllegalArgumentException("Access policy ID cannot be specified.");
}
+ if (requestAccessPolicy.getResource() == null) {
+ throw new IllegalArgumentException("Access policy resource must be specified.");
+ }
+
+ // ensure this is a valid action
+ RequestAction.valueOfValue(requestAccessPolicy.getAction());
+
if (isReplicateRequest()) {
return replicate(HttpMethod.POST, accessPolicyEntity);
}
@@ -205,7 +229,7 @@ public class AccessPolicyResource extends ApplicationResource {
if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
- final Authorizable accessPolicies = lookup.getAccessPoliciesAuthorizable();
+ final Authorizable accessPolicies = lookup.getAccessPolicyByResource(requestAccessPolicy.getResource());
accessPolicies.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
});
}
@@ -270,8 +294,8 @@ public class AccessPolicyResource extends ApplicationResource {
// authorize access
serviceFacade.authorizeAccess(lookup -> {
- final Authorizable accessPolicy = lookup.getAccessPolicyAuthorizable(id);
- accessPolicy.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+ Authorizable authorizable = lookup.getAccessPolicyById(id);
+ authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
});
// get the access policy
@@ -347,8 +371,8 @@ public class AccessPolicyResource extends ApplicationResource {
serviceFacade,
revision,
lookup -> {
- Authorizable authorizable = lookup.getAccessPolicyAuthorizable(id);
- authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+ Authorizable authorizable = lookup.getAccessPolicyById(id);
+ authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
null,
() -> {
@@ -422,8 +446,8 @@ public class AccessPolicyResource extends ApplicationResource {
serviceFacade,
revision,
lookup -> {
- final Authorizable accessPolicy = lookup.getAccessPolicyAuthorizable(id);
- accessPolicy.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+ final Authorizable accessPolicy = lookup.getAccessPolicyById(id);
+ accessPolicy.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
},
() -> {
},
http://git-wip-us.apache.org/repos/asf/nifi/blob/e0c96794/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index 42f836c..faa1ab4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -16,31 +16,10 @@
*/
package org.apache.nifi.web.api;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.core.CacheControl;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.ResponseBuilder;
-import javax.ws.rs.core.UriBuilder;
-import javax.ws.rs.core.UriBuilderException;
-import javax.ws.rs.core.UriInfo;
-
+import com.sun.jersey.api.core.HttpContext;
+import com.sun.jersey.api.representation.Form;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import com.sun.jersey.server.impl.model.method.dispatch.FormDispatchProvider;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
@@ -54,6 +33,13 @@ import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.Snippet;
+import org.apache.nifi.remote.HttpRemoteSiteListener;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.exception.BadRequestException;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.NotAuthorizedException;
+import org.apache.nifi.remote.protocol.ResponseCode;
+import org.apache.nifi.remote.protocol.http.HttpHeaders;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.AuthorizableLookup;
import org.apache.nifi.web.AuthorizeAccess;
@@ -62,14 +48,40 @@ import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.SnippetDTO;
import org.apache.nifi.web.api.entity.ComponentEntity;
+import org.apache.nifi.web.api.entity.TransactionResultEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.sun.jersey.api.core.HttpContext;
-import com.sun.jersey.api.representation.Form;
-import com.sun.jersey.core.util.MultivaluedMapImpl;
-import com.sun.jersey.server.impl.model.method.dispatch.FormDispatchProvider;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.CacheControl;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriBuilderException;
+import javax.ws.rs.core.UriInfo;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static javax.ws.rs.core.Response.Status.NOT_FOUND;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;
/**
* Base class for controllers.
@@ -712,4 +724,152 @@ public abstract class ApplicationResource {
public static enum ReplicationTarget {
CLUSTER_NODES, CLUSTER_COORDINATOR;
}
+
+ // -----------------
+ // HTTP site to site
+ // -----------------
+
+ protected Integer negotiateTransportProtocolVersion(final HttpServletRequest req, final VersionNegotiator transportProtocolVersionNegotiator) throws BadRequestException {
+ String protocolVersionStr = req.getHeader(HttpHeaders.PROTOCOL_VERSION);
+ if (isEmpty(protocolVersionStr)) {
+ throw new BadRequestException("Protocol version was not specified.");
+ }
+
+ final Integer requestedProtocolVersion;
+ try {
+ requestedProtocolVersion = Integer.valueOf(protocolVersionStr);
+ } catch (NumberFormatException e) {
+ throw new BadRequestException("Specified protocol version was not in a valid number format: " + protocolVersionStr);
+ }
+
+ Integer protocolVersion;
+ if (transportProtocolVersionNegotiator.isVersionSupported(requestedProtocolVersion)) {
+ return requestedProtocolVersion;
+ } else {
+ protocolVersion = transportProtocolVersionNegotiator.getPreferredVersion(requestedProtocolVersion);
+ }
+
+ if (protocolVersion == null) {
+ throw new BadRequestException("Specified protocol version is not supported: " + protocolVersionStr);
+ }
+ return protocolVersion;
+ }
+
+ protected Response.ResponseBuilder setCommonHeaders(final Response.ResponseBuilder builder, final Integer transportProtocolVersion, final HttpRemoteSiteListener transactionManager) {
+ return builder.header(HttpHeaders.PROTOCOL_VERSION, transportProtocolVersion)
+ .header(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL, transactionManager.getTransactionTtlSec());
+ }
+
+ protected class ResponseCreator {
+
+ public Response nodeTypeErrorResponse(String errMsg) {
+ return noCache(Response.status(Response.Status.FORBIDDEN)).type(MediaType.TEXT_PLAIN).entity(errMsg).build();
+ }
+
+ public Response httpSiteToSiteIsNotEnabledResponse() {
+ return noCache(Response.status(Response.Status.FORBIDDEN)).type(MediaType.TEXT_PLAIN).entity("HTTP(S) Site-to-Site is not enabled on this host.").build();
+ }
+
+ public Response wrongPortTypeResponse(String portType, String portId) {
+ logger.debug("Port type was wrong. portType={}, portId={}", portType, portId);
+ TransactionResultEntity entity = new TransactionResultEntity();
+ entity.setResponseCode(ResponseCode.ABORT.getCode());
+ entity.setMessage("Port was not found.");
+ entity.setFlowFileSent(0);
+ return Response.status(NOT_FOUND).entity(entity).type(MediaType.APPLICATION_JSON_TYPE).build();
+ }
+
+ public Response transactionNotFoundResponse(String portId, String transactionId) {
+ logger.debug("Transaction was not found. portId={}, transactionId={}", portId, transactionId);
+ TransactionResultEntity entity = new TransactionResultEntity();
+ entity.setResponseCode(ResponseCode.ABORT.getCode());
+ entity.setMessage("Transaction was not found.");
+ entity.setFlowFileSent(0);
+ return Response.status(NOT_FOUND).entity(entity).type(MediaType.APPLICATION_JSON_TYPE).build();
+ }
+
+ public Response unexpectedErrorResponse(String portId, Exception e) {
+ logger.error("Unexpected exception occurred. portId={}", portId);
+ logger.error("Exception detail:", e);
+ TransactionResultEntity entity = new TransactionResultEntity();
+ entity.setResponseCode(ResponseCode.ABORT.getCode());
+ entity.setMessage("Server encountered an exception.");
+ entity.setFlowFileSent(0);
+ return Response.serverError().entity(entity).type(MediaType.APPLICATION_JSON_TYPE).build();
+ }
+
+ public Response unexpectedErrorResponse(String portId, String transactionId, Exception e) {
+ logger.error("Unexpected exception occurred. portId={}, transactionId={}", portId, transactionId);
+ logger.error("Exception detail:", e);
+ TransactionResultEntity entity = new TransactionResultEntity();
+ entity.setResponseCode(ResponseCode.ABORT.getCode());
+ entity.setMessage("Server encountered an exception.");
+ entity.setFlowFileSent(0);
+ return Response.serverError().entity(entity).type(MediaType.APPLICATION_JSON_TYPE).build();
+ }
+
+ public Response unauthorizedResponse(NotAuthorizedException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Client request was not authorized. {}", e.getMessage());
+ }
+ TransactionResultEntity entity = new TransactionResultEntity();
+ entity.setResponseCode(ResponseCode.UNAUTHORIZED.getCode());
+ entity.setMessage(e.getMessage());
+ entity.setFlowFileSent(0);
+ return Response.status(Response.Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON_TYPE).entity(e.getMessage()).build();
+ }
+
+ public Response badRequestResponse(Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Client sent a bad request. {}", e.getMessage());
+ }
+ TransactionResultEntity entity = new TransactionResultEntity();
+ entity.setResponseCode(ResponseCode.ABORT.getCode());
+ entity.setMessage(e.getMessage());
+ entity.setFlowFileSent(0);
+ return Response.status(Response.Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON_TYPE).entity(entity).build();
+ }
+
+ public Response handshakeExceptionResponse(HandshakeException e) {
+ if(logger.isDebugEnabled()){
+ logger.debug("Handshake failed, {}", e.getMessage());
+ }
+ ResponseCode handshakeRes = e.getResponseCode();
+ Response.Status statusCd;
+ TransactionResultEntity entity = new TransactionResultEntity();
+ entity.setResponseCode(handshakeRes != null ? handshakeRes.getCode() : ResponseCode.ABORT.getCode());
+ entity.setMessage(e.getMessage());
+ entity.setFlowFileSent(0);
+ switch (handshakeRes) {
+ case PORT_NOT_IN_VALID_STATE:
+ case PORTS_DESTINATION_FULL:
+ return Response.status(Response.Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON_TYPE).entity(entity).build();
+ case UNAUTHORIZED:
+ statusCd = Response.Status.UNAUTHORIZED;
+ break;
+ case UNKNOWN_PORT:
+ statusCd = NOT_FOUND;
+ break;
+ default:
+ statusCd = Response.Status.BAD_REQUEST;
+ }
+ return Response.status(statusCd).type(MediaType.APPLICATION_JSON_TYPE).entity(entity).build();
+ }
+
+ public Response acceptedResponse(final HttpRemoteSiteListener transactionManager, final Object entity, final Integer protocolVersion) {
+ return noCache(setCommonHeaders(Response.status(Response.Status.ACCEPTED), protocolVersion, transactionManager))
+ .entity(entity).build();
+ }
+
+ public Response locationResponse(UriInfo uriInfo, String portType, String portId, String transactionId, Object entity,
+ Integer protocolVersion, final HttpRemoteSiteListener transactionManager) {
+
+ String path = "/data-transfer/" + portType + "/" + portId + "/transactions/" + transactionId;
+ URI location = uriInfo.getBaseUriBuilder().path(path).build();
+ return noCache(setCommonHeaders(Response.created(location), protocolVersion, transactionManager)
+ .header(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE))
+ .entity(entity).build();
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e0c96794/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
new file mode 100644
index 0000000..aad8b4a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
@@ -0,0 +1,837 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api;
+
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.authorization.AccessDeniedException;
+import org.apache.nifi.authorization.AuthorizationRequest;
+import org.apache.nifi.authorization.AuthorizationResult;
+import org.apache.nifi.authorization.AuthorizationResult.Result;
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.RequestAction;
+import org.apache.nifi.authorization.Resource;
+import org.apache.nifi.authorization.resource.ResourceFactory;
+import org.apache.nifi.authorization.resource.ResourceType;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.authorization.user.NiFiUserUtils;
+import org.apache.nifi.remote.HttpRemoteSiteListener;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator;
+import org.apache.nifi.remote.exception.BadRequestException;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.NotAuthorizedException;
+import org.apache.nifi.remote.exception.RequestExpiredException;
+import org.apache.nifi.remote.io.http.HttpOutput;
+import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession;
+import org.apache.nifi.remote.protocol.HandshakeProperty;
+import org.apache.nifi.remote.protocol.ResponseCode;
+import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol;
+import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocolImpl;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.web.api.entity.TransactionResultEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+import javax.ws.rs.core.UriInfo;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_COUNT;
+import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_DURATION;
+import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_SIZE;
+import static org.apache.nifi.remote.protocol.HandshakeProperty.REQUEST_EXPIRATION_MILLIS;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;
+
+/**
+ * RESTful endpoint for managing a SiteToSite connection.
+ */
+@Path("/data-transfer")
+@Api(
+ value = "/data-transfer",
+ description = "Supports data transfers with this NiFi using HTTP based site to site"
+)
+public class DataTransferResource extends ApplicationResource {
+
+ private static final Logger logger = LoggerFactory.getLogger(DataTransferResource.class);
+
+ public static final String CHECK_SUM = "checksum";
+ public static final String RESPONSE_CODE = "responseCode";
+
+
+ private static final String PORT_TYPE_INPUT = "input-ports";
+ private static final String PORT_TYPE_OUTPUT = "output-ports";
+
+ private Authorizer authorizer;
+ private final ResponseCreator responseCreator = new ResponseCreator();
+ private final VersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1);
+ private final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance();
+
+ /**
+ * Authorizes access to data transfers.
+ *
+ * Note: Protected for testing purposes
+ */
+ protected void authorizeDataTransfer(final ResourceType resourceType, final String identifier) {
+ final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+ if (!ResourceType.InputPort.equals(resourceType) && !ResourceType.OutputPort.equals(resourceType)) {
+ throw new IllegalArgumentException("The resource must be an Input or Output Port.");
+ }
+
+ // TODO - use DataTransferAuthorizable after looking up underlying component for consistentency
+ final Resource resource = ResourceFactory.getComponentResource(resourceType, identifier, identifier);
+ final AuthorizationRequest request = new AuthorizationRequest.Builder()
+ .resource(ResourceFactory.getDataTransferResource(resource))
+ .identity(user.getIdentity())
+ .anonymous(user.isAnonymous())
+ .accessAttempt(true)
+ .action(RequestAction.WRITE)
+ .build();
+
+ final AuthorizationResult result = authorizer.authorize(request);
+ if (!Result.Approved.equals(result.getResult())) {
+ final String message = StringUtils.isNotBlank(result.getExplanation()) ? result.getExplanation() : "Access is denied";
+ throw new AccessDeniedException(message);
+ }
+ }
+
+ @POST
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{portType}/{portId}/transactions")
+ // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+ @ApiOperation(
+ value = "Create a transaction to the specified output port or input port",
+ response = TransactionResultEntity.class,
+ authorizations = {
+ @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+ @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."),
+ @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful"),
+ }
+ )
+ public Response createPortTransaction(
+ @ApiParam(
+ value = "The port type.",
+ required = true,
+ allowableValues = "input-ports, output-ports"
+ )
+ @PathParam("portType") String portType,
+ @PathParam("portId") String portId,
+ @Context HttpServletRequest req,
+ @Context ServletContext context,
+ @Context UriInfo uriInfo,
+ InputStream inputStream) {
+
+
+ if(!PORT_TYPE_INPUT.equals(portType) && !PORT_TYPE_OUTPUT.equals(portType)){
+ return responseCreator.wrongPortTypeResponse(portType, portId);
+ }
+
+ // authorize access
+ authorizeDataTransfer(PORT_TYPE_INPUT.equals(portType) ? ResourceType.InputPort : ResourceType.OutputPort, portId);
+
+ final ValidateRequestResult validationResult = validateResult(req, portId);
+ if (validationResult.errResponse != null) {
+ return validationResult.errResponse;
+ }
+
+ logger.debug("createPortTransaction request: clientId={}, portType={}, portId={}", portType, portId);
+
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ final String transactionId = transactionManager.createTransaction();
+ final Peer peer = constructPeer(req, inputStream, out, portId, transactionId);
+ final int transportProtocolVersion = validationResult.transportProtocolVersion;
+
+ try {
+ // Execute handshake.
+ initiateServerProtocol(peer, transportProtocolVersion);
+
+ TransactionResultEntity entity = new TransactionResultEntity();
+ entity.setResponseCode(ResponseCode.PROPERTIES_OK.getCode());
+ entity.setMessage("Handshake properties are valid, and port is running. A transaction is created:" + transactionId);
+
+ return responseCreator.locationResponse(uriInfo, portType, portId, transactionId, entity, transportProtocolVersion, transactionManager);
+
+ } catch (HandshakeException e) {
+ transactionManager.cancelTransaction(transactionId);
+ return responseCreator.handshakeExceptionResponse(e);
+
+ } catch (Exception e) {
+ transactionManager.cancelTransaction(transactionId);
+ return responseCreator.unexpectedErrorResponse(portId, e);
+ }
+ }
+
+ @POST
+ @Consumes(MediaType.APPLICATION_OCTET_STREAM)
+ @Produces(MediaType.TEXT_PLAIN)
+ @Path("input-ports/{portId}/transactions/{transactionId}/flow-files")
+ // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+ @ApiOperation(
+ value = "Transfer flow files to the input port",
+ response = String.class,
+ authorizations = {
+ @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+ @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."),
+ @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful"),
+ }
+ )
+ public Response receiveFlowFiles(
+ @ApiParam(
+ value = "The input port id.",
+ required = true
+ )
+ @PathParam("portId") String portId,
+ @PathParam("transactionId") String transactionId,
+ @Context HttpServletRequest req,
+ @Context ServletContext context,
+ InputStream inputStream) {
+
+ // authorize access
+ authorizeDataTransfer(ResourceType.InputPort, portId);
+
+ final ValidateRequestResult validationResult = validateResult(req, portId, transactionId);
+ if (validationResult.errResponse != null) {
+ return validationResult.errResponse;
+ }
+
+ logger.debug("receiveFlowFiles request: portId={}", portId);
+
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ final Peer peer = constructPeer(req, inputStream, out, portId, transactionId);
+ final int transportProtocolVersion = validationResult.transportProtocolVersion;
+
+ try {
+ HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(peer, transportProtocolVersion);
+ int numOfFlowFiles = serverProtocol.getPort().receiveFlowFiles(peer, serverProtocol);
+ logger.debug("finished receiving flow files, numOfFlowFiles={}", numOfFlowFiles);
+ if (numOfFlowFiles < 1) {
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity("Client should send request when there is data to send. There was no flow file sent.").build();
+ }
+ } catch (HandshakeException e) {
+ return responseCreator.handshakeExceptionResponse(e);
+
+ } catch (NotAuthorizedException e) {
+ return responseCreator.unauthorizedResponse(e);
+
+ } catch (BadRequestException | RequestExpiredException e) {
+ return responseCreator.badRequestResponse(e);
+
+ } catch (Exception e) {
+ return responseCreator.unexpectedErrorResponse(portId, e);
+ }
+
+ String serverChecksum = ((HttpServerCommunicationsSession)peer.getCommunicationsSession()).getChecksum();
+ return responseCreator.acceptedResponse(transactionManager, serverChecksum, transportProtocolVersion);
+ }
+
+ private HttpFlowFileServerProtocol initiateServerProtocol(Peer peer, Integer transportProtocolVersion) throws IOException {
+ // Switch transaction protocol version based on transport protocol version.
+ TransportProtocolVersionNegotiator negotiatedTransportProtocolVersion = new TransportProtocolVersionNegotiator(transportProtocolVersion);
+ VersionNegotiator versionNegotiator = new StandardVersionNegotiator(negotiatedTransportProtocolVersion.getTransactionProtocolVersion());
+ HttpFlowFileServerProtocol serverProtocol = getHttpFlowFileServerProtocol(versionNegotiator);
+ HttpRemoteSiteListener.getInstance().setupServerProtocol(serverProtocol);
+ // TODO: How should I pass cluster information?
+ // serverProtocol.setNodeInformant(clusterManager);
+ serverProtocol.handshake(peer);
+ return serverProtocol;
+ }
+
+ HttpFlowFileServerProtocol getHttpFlowFileServerProtocol(VersionNegotiator versionNegotiator) {
+ return new HttpFlowFileServerProtocolImpl(versionNegotiator);
+ }
+
+ private Peer constructPeer(HttpServletRequest req, InputStream inputStream, OutputStream outputStream, String portId, String transactionId) {
+ String clientHostName = req.getRemoteHost();
+ int clientPort = req.getRemotePort();
+
+ PeerDescription peerDescription = new PeerDescription(clientHostName, clientPort, req.isSecure());
+
+ HttpServerCommunicationsSession commSession = new HttpServerCommunicationsSession(inputStream, outputStream, transactionId);
+
+ boolean useCompression = false;
+ final String useCompressionStr = req.getHeader(HANDSHAKE_PROPERTY_USE_COMPRESSION);
+ if (!isEmpty(useCompressionStr) && Boolean.valueOf(useCompressionStr)) {
+ useCompression = true;
+ }
+
+ final String requestExpiration = req.getHeader(HANDSHAKE_PROPERTY_REQUEST_EXPIRATION);
+ final String batchCount = req.getHeader(HANDSHAKE_PROPERTY_BATCH_COUNT);
+ final String batchSize = req.getHeader(HANDSHAKE_PROPERTY_BATCH_SIZE);
+ final String batchDuration = req.getHeader(HANDSHAKE_PROPERTY_BATCH_DURATION);
+
+ commSession.putHandshakeParam(HandshakeProperty.PORT_IDENTIFIER, portId);
+ commSession.putHandshakeParam(HandshakeProperty.GZIP, String.valueOf(useCompression));
+
+ if (!isEmpty(requestExpiration)) commSession.putHandshakeParam(REQUEST_EXPIRATION_MILLIS, requestExpiration);
+ if (!isEmpty(batchCount)) commSession.putHandshakeParam(BATCH_COUNT, batchCount);
+ if (!isEmpty(batchSize)) commSession.putHandshakeParam(BATCH_SIZE, batchSize);
+ if (!isEmpty(batchDuration)) commSession.putHandshakeParam(BATCH_DURATION, batchDuration);
+
+ if(peerDescription.isSecure()){
+ NiFiUser nifiUser = NiFiUserUtils.getNiFiUser();
+ logger.debug("initiating peer, nifiUser={}", nifiUser);
+ commSession.setUserDn(nifiUser.getIdentity());
+ }
+
+ // TODO: Followed how SocketRemoteSiteListener define peerUrl and clusterUrl, but it can be more meaningful values, especially for clusterUrl.
+ String peerUrl = "nifi://" + clientHostName + ":" + clientPort;
+ String clusterUrl = "nifi://localhost:" + req.getLocalPort();
+ return new Peer(peerDescription, commSession, peerUrl, clusterUrl);
+ }
+
+ @DELETE
+ @Consumes(MediaType.APPLICATION_OCTET_STREAM)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("output-ports/{portId}/transactions/{transactionId}")
+ // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+ @ApiOperation(
+ value = "Commit or cancel the specified transaction",
+ response = TransactionResultEntity.class,
+ authorizations = {
+ @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+ @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."),
+ @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful"),
+ }
+ )
+ public Response commitOutputPortTransaction(
+ @ApiParam(
+ value = "The response code. Available values are CONFIRM_TRANSACTION(12) or CANCEL_TRANSACTION(15).",
+ required = true
+ )
+ @QueryParam(RESPONSE_CODE) Integer responseCode,
+ @ApiParam(
+ value = "A checksum calculated at client side using CRC32 to check flow file content integrity. It must match with the value calculated at server side.",
+ required = true
+ )
+ @QueryParam(CHECK_SUM) @DefaultValue(StringUtils.EMPTY) String checksum,
+ @ApiParam(
+ value = "The output port id.",
+ required = true
+ )
+ @PathParam("portId") String portId,
+ @ApiParam(
+ value = "The transaction id.",
+ required = true
+ )
+ @PathParam("transactionId") String transactionId,
+ @Context HttpServletRequest req,
+ @Context ServletContext context,
+ InputStream inputStream) {
+
+ // authorize access
+ authorizeDataTransfer(ResourceType.OutputPort, portId);
+
+ final ValidateRequestResult validationResult = validateResult(req, portId, transactionId);
+ if (validationResult.errResponse != null) {
+ return validationResult.errResponse;
+ }
+
+ logger.debug("commitOutputPortTransaction request: portId={}, transactionId={}", portId, transactionId);
+
+ final int transportProtocolVersion = validationResult.transportProtocolVersion;
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ final Peer peer = constructPeer(req, inputStream, out, portId, transactionId);
+
+ final TransactionResultEntity entity = new TransactionResultEntity();
+ try {
+ HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(peer, transportProtocolVersion);
+
+ String inputErrMessage = null;
+ if (responseCode == null) {
+ inputErrMessage = "responseCode is required.";
+ } else if(ResponseCode.CONFIRM_TRANSACTION.getCode() != responseCode
+ && ResponseCode.CANCEL_TRANSACTION.getCode() != responseCode) {
+ inputErrMessage = "responseCode " + responseCode + " is invalid. ";
+ }
+
+ if (inputErrMessage != null){
+ entity.setMessage(inputErrMessage);
+ entity.setResponseCode(ResponseCode.ABORT.getCode());
+ return Response.status(Response.Status.BAD_REQUEST).entity(entity).build();
+ }
+
+ if (ResponseCode.CANCEL_TRANSACTION.getCode() == responseCode) {
+ return cancelTransaction(transactionId, entity);
+ }
+
+ int flowFileSent = serverProtocol.commitTransferTransaction(peer, checksum);
+ entity.setResponseCode(ResponseCode.CONFIRM_TRANSACTION.getCode());
+ entity.setFlowFileSent(flowFileSent);
+
+ } catch (HandshakeException e) {
+ return responseCreator.handshakeExceptionResponse(e);
+
+ } catch (Exception e) {
+ HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
+ logger.error("Failed to process the request", e);
+ if(ResponseCode.BAD_CHECKSUM.equals(commsSession.getResponseCode())){
+ entity.setResponseCode(commsSession.getResponseCode().getCode());
+ entity.setMessage(e.getMessage());
+
+ Response.ResponseBuilder builder = Response.status(Response.Status.BAD_REQUEST).entity(entity);
+ return clusterContext(noCache(builder)).build();
+ }
+
+ return responseCreator.unexpectedErrorResponse(portId, transactionId, e);
+ }
+
+ return clusterContext(noCache(setCommonHeaders(Response.ok(entity), transportProtocolVersion, transactionManager))).build();
+ }
+
+
+ @DELETE
+ @Consumes(MediaType.APPLICATION_OCTET_STREAM)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("input-ports/{portId}/transactions/{transactionId}")
+ // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+ @ApiOperation(
+ value = "Commit or cancel the specified transaction",
+ response = TransactionResultEntity.class,
+ authorizations = {
+ @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+ @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."),
+ @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful"),
+ }
+ )
+ public Response commitInputPortTransaction(
+ @ApiParam(
+ value = "The response code. Available values are BAD_CHECKSUM(19), CONFIRM_TRANSACTION(12) or CANCEL_TRANSACTION(15).",
+ required = true
+ )
+ @QueryParam(RESPONSE_CODE) Integer responseCode,
+ @ApiParam(
+ value = "The input port id.",
+ required = true
+ )
+ @PathParam("portId") String portId,
+ @ApiParam(
+ value = "The transaction id.",
+ required = true
+ )
+ @PathParam("transactionId") String transactionId,
+ @Context HttpServletRequest req,
+ @Context ServletContext context,
+ InputStream inputStream) {
+
+ // authorize access
+ authorizeDataTransfer(ResourceType.InputPort, portId);
+
+ final ValidateRequestResult validationResult = validateResult(req, portId, transactionId);
+ if (validationResult.errResponse != null) {
+ return validationResult.errResponse;
+ }
+
+ logger.debug("commitInputPortTransaction request: portId={}, transactionId={}, responseCode={}",
+ portId, transactionId, responseCode);
+
+ final int transportProtocolVersion = validationResult.transportProtocolVersion;
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ final Peer peer = constructPeer(req, inputStream, out, portId, transactionId);
+
+ final TransactionResultEntity entity = new TransactionResultEntity();
+ try {
+ HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(peer, transportProtocolVersion);
+ HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
+ // Pass the response code sent from the client.
+ String inputErrMessage = null;
+ if (responseCode == null) {
+ inputErrMessage = "responseCode is required.";
+ } else if(ResponseCode.BAD_CHECKSUM.getCode() != responseCode
+ && ResponseCode.CONFIRM_TRANSACTION.getCode() != responseCode
+ && ResponseCode.CANCEL_TRANSACTION.getCode() != responseCode) {
+ inputErrMessage = "responseCode " + responseCode + " is invalid. ";
+ }
+
+ if (inputErrMessage != null){
+ entity.setMessage(inputErrMessage);
+ entity.setResponseCode(ResponseCode.ABORT.getCode());
+ return Response.status(Response.Status.BAD_REQUEST).entity(entity).build();
+ }
+
+ if (ResponseCode.CANCEL_TRANSACTION.getCode() == responseCode) {
+ return cancelTransaction(transactionId, entity);
+ }
+
+ commsSession.setResponseCode(ResponseCode.fromCode(responseCode));
+
+ try {
+ int flowFileSent = serverProtocol.commitReceiveTransaction(peer);
+ entity.setResponseCode(commsSession.getResponseCode().getCode());
+ entity.setFlowFileSent(flowFileSent);
+
+ } catch (IOException e){
+ if (ResponseCode.BAD_CHECKSUM.getCode() == responseCode && e.getMessage().contains("Received a BadChecksum response")){
+ // AbstractFlowFileServerProtocol throws IOException after it canceled transaction.
+ // This is a known behavior and if we return 500 with this exception,
+ // it's not clear if there is an issue at server side, or cancel operation has been accomplished.
+ // Above conditions can guarantee this is the latter case, we return 200 OK here.
+ entity.setResponseCode(ResponseCode.CANCEL_TRANSACTION.getCode());
+ return clusterContext(noCache(Response.ok(entity))).build();
+ } else {
+ return responseCreator.unexpectedErrorResponse(portId, transactionId, e);
+ }
+ }
+
+ } catch (HandshakeException e) {
+ return responseCreator.handshakeExceptionResponse(e);
+
+ } catch (Exception e) {
+ return responseCreator.unexpectedErrorResponse(portId, transactionId, e);
+ }
+
+ return clusterContext(noCache(setCommonHeaders(Response.ok(entity), transportProtocolVersion, transactionManager))).build();
+ }
+
+ private Response cancelTransaction(String transactionId, TransactionResultEntity entity) {
+ transactionManager.cancelTransaction(transactionId);
+ entity.setMessage("Transaction has been canceled.");
+ entity.setResponseCode(ResponseCode.CANCEL_TRANSACTION.getCode());
+ return Response.ok(entity).build();
+ }
+
+
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_OCTET_STREAM)
+ @Path("output-ports/{portId}/transactions/{transactionId}/flow-files")
+ // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+ @ApiOperation(
+ value = "Transfer flow files from the output port",
+ response = StreamingOutput.class,
+ authorizations = {
+ @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+ @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 200, message = "There is no flow file to return."),
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."),
+ @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful"),
+ }
+ )
+ public Response transferFlowFiles(
+ @ApiParam(
+ value = "The output port id.",
+ required = true
+ )
+ @PathParam("portId") String portId,
+ @PathParam("transactionId") String transactionId,
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @Context ServletContext context,
+ InputStream inputStream) {
+
+ // authorize access
+ authorizeDataTransfer(ResourceType.OutputPort, portId);
+
+ final ValidateRequestResult validationResult = validateResult(req, portId, transactionId);
+ if (validationResult.errResponse != null) {
+ return validationResult.errResponse;
+ }
+
+ logger.debug("transferFlowFiles request: portId={}", portId);
+
+ // Before opening the real output stream for HTTP response,
+ // use this temporary output stream to buffer handshake result.
+ final ByteArrayOutputStream tempBos = new ByteArrayOutputStream();
+ final Peer peer = constructPeer(req, inputStream, tempBos, portId, transactionId);
+ final int transportProtocolVersion = validationResult.transportProtocolVersion;
+ try {
+ final HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(peer, transportProtocolVersion);
+
+ StreamingOutput flowFileContent = new StreamingOutput() {
+ @Override
+ public void write(OutputStream outputStream) throws IOException, WebApplicationException {
+
+ HttpOutput output = (HttpOutput)peer.getCommunicationsSession().getOutput();
+ output.setOutputStream(outputStream);
+
+ try {
+ int numOfFlowFiles = serverProtocol.getPort().transferFlowFiles(peer, serverProtocol);
+ logger.debug("finished transferring flow files, numOfFlowFiles={}", numOfFlowFiles);
+ if(numOfFlowFiles < 1){
+ // There was no flow file to transfer. Throw this exception to stop responding with SEE OTHER.
+ throw new WebApplicationException(Response.Status.OK);
+ }
+ } catch (NotAuthorizedException | BadRequestException | RequestExpiredException e) {
+ // Handshake is done outside of write() method, so these exception wouldn't be thrown.
+ throw new IOException("Failed to process the request.", e);
+ }
+ }
+
+ };
+
+ return responseCreator.acceptedResponse(transactionManager, flowFileContent, transportProtocolVersion);
+
+ } catch (HandshakeException e) {
+ return responseCreator.handshakeExceptionResponse(e);
+
+ } catch (Exception e) {
+ return responseCreator.unexpectedErrorResponse(portId, e);
+ }
+ }
+
+ @PUT
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("input-ports/{portId}/transactions/{transactionId}")
+ // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+ @ApiOperation(
+ value = "Extend transaction TTL",
+ response = TransactionResultEntity.class,
+ authorizations = {
+ @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+ @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response extendInputPortTransactionTTL(
+ @PathParam("portId") String portId,
+ @PathParam("transactionId") String transactionId,
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @Context ServletContext context,
+ @Context UriInfo uriInfo,
+ InputStream inputStream) {
+
+ // authorize access
+ authorizeDataTransfer(ResourceType.InputPort, portId);
+
+ return extendPortTransactionTTL(PORT_TYPE_INPUT, portId, transactionId, req, res, context, uriInfo, inputStream);
+ }
+
+ @PUT
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("output-ports/{portId}/transactions/{transactionId}")
+ // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+ @ApiOperation(
+ value = "Extend transaction TTL",
+ response = TransactionResultEntity.class,
+ authorizations = {
+ @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+ @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."),
+ @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful"),
+ }
+ )
+ public Response extendOutputPortTransactionTTL(
+ @PathParam("portId") String portId,
+ @PathParam("transactionId") String transactionId,
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @Context ServletContext context,
+ @Context UriInfo uriInfo,
+ InputStream inputStream) {
+
+ // authorize access
+ authorizeDataTransfer(ResourceType.OutputPort, portId);
+
+ return extendPortTransactionTTL(PORT_TYPE_OUTPUT, portId, transactionId, req, res, context, uriInfo, inputStream);
+ }
+
+ public Response extendPortTransactionTTL(
+ String portType,
+ String portId,
+ String transactionId,
+ HttpServletRequest req,
+ HttpServletResponse res,
+ ServletContext context,
+ UriInfo uriInfo,
+ InputStream inputStream) {
+
+ final ValidateRequestResult validationResult = validateResult(req, portId, transactionId);
+ if (validationResult.errResponse != null) {
+ return validationResult.errResponse;
+ }
+
+ if(!PORT_TYPE_INPUT.equals(portType) && !PORT_TYPE_OUTPUT.equals(portType)){
+ return responseCreator.wrongPortTypeResponse(portType, portId);
+ }
+
+ logger.debug("extendOutputPortTransactionTTL request: portType={}, portId={}, transactionId={}",
+ portType, portId, transactionId);
+
+ final int transportProtocolVersion = validationResult.transportProtocolVersion;
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ final Peer peer = constructPeer(req, inputStream, out, portId, transactionId);
+
+ try {
+ // Do handshake
+ initiateServerProtocol(peer, transportProtocolVersion);
+ transactionManager.extendsTransaction(transactionId);
+
+ final TransactionResultEntity entity = new TransactionResultEntity();
+ entity.setResponseCode(ResponseCode.CONTINUE_TRANSACTION.getCode());
+ entity.setMessage("Extended TTL.");
+ return clusterContext(noCache(setCommonHeaders(Response.ok(entity), transportProtocolVersion, transactionManager))).build();
+
+ } catch (HandshakeException e) {
+ return responseCreator.handshakeExceptionResponse(e);
+
+ } catch (Exception e) {
+ return responseCreator.unexpectedErrorResponse(portId, transactionId, e);
+ }
+
+ }
+
+ private class ValidateRequestResult {
+ private Integer transportProtocolVersion;
+ private Response errResponse;
+ }
+
+ private ValidateRequestResult validateResult(HttpServletRequest req, String portId) {
+ return validateResult(req, portId, null);
+ }
+
+ private ValidateRequestResult validateResult(HttpServletRequest req, String portId, String transactionId) {
+ ValidateRequestResult result = new ValidateRequestResult();
+ if(!properties.isSiteToSiteHttpEnabled()) {
+ result.errResponse = responseCreator.httpSiteToSiteIsNotEnabledResponse();
+ return result;
+ }
+
+ // TODO: NCM no longer exists.
+ /*
+ if (properties.isClusterManager()) {
+ result.errResponse = responseCreator.nodeTypeErrorResponse(req.getPathInfo() + " is not available on a NiFi Cluster Manager.");
+ return result;
+ }
+ */
+
+
+ try {
+ result.transportProtocolVersion = negotiateTransportProtocolVersion(req, transportProtocolVersionNegotiator);
+ } catch (BadRequestException e) {
+ result.errResponse = responseCreator.badRequestResponse(e);
+ return result;
+ }
+
+ if(!isEmpty(transactionId) && !transactionManager.isTransactionActive(transactionId)) {
+ result.errResponse = responseCreator.transactionNotFoundResponse(portId, transactionId);
+ return result;
+ }
+
+ return result;
+ }
+
+
+ // setters
+
+ public void setAuthorizer(Authorizer authorizer) {
+ this.authorizer = authorizer;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e0c96794/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index 4447014..b719e24 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -16,31 +16,13 @@
*/
package org.apache.nifi.web.api;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
+import com.sun.jersey.api.core.ResourceContext;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.AuthorizationRequest;
@@ -123,13 +105,29 @@ import org.apache.nifi.web.api.request.DateTimeParameter;
import org.apache.nifi.web.api.request.IntegerParameter;
import org.apache.nifi.web.api.request.LongParameter;
-import com.sun.jersey.api.core.ResourceContext;
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
/**
* RESTful endpoint for managing a Flow.
@@ -208,7 +206,7 @@ public class FlowResource extends ApplicationResource {
/**
* Authorizes access to the flow.
*/
- private void authorizeFlow(final RequestAction action) {
+ private void authorizeFlow() {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final Map<String,String> userContext;
@@ -224,7 +222,7 @@ public class FlowResource extends ApplicationResource {
.identity(user.getIdentity())
.anonymous(user.isAnonymous())
.accessAttempt(true)
- .action(action)
+ .action(RequestAction.READ)
.userContext(userContext)
.build();
@@ -287,7 +285,7 @@ public class FlowResource extends ApplicationResource {
}
)
public Response generateClientId() {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
return clusterContext(generateOkResponse(generateUuid())).build();
}
@@ -321,7 +319,7 @@ public class FlowResource extends ApplicationResource {
)
public Response getFlowConfig() {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
@@ -350,7 +348,7 @@ public class FlowResource extends ApplicationResource {
)
public Response getCurrentUser() {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// note that the cluster manager will handle this request directly
final NiFiUser user = NiFiUserUtils.getNiFiUser();
@@ -413,7 +411,7 @@ public class FlowResource extends ApplicationResource {
)
@QueryParam("recursive") @DefaultValue(RECURSIVE) Boolean recursive) throws InterruptedException {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
@@ -458,7 +456,7 @@ public class FlowResource extends ApplicationResource {
)
public Response getControllerServicesFromController() {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
@@ -511,7 +509,7 @@ public class FlowResource extends ApplicationResource {
)
@PathParam("id") String groupId) throws InterruptedException {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// get all the controller services
final Set<ControllerServiceEntity> controllerServices = serviceFacade.getControllerServices(groupId);
@@ -566,7 +564,7 @@ public class FlowResource extends ApplicationResource {
)
@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId) {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
@@ -622,7 +620,7 @@ public class FlowResource extends ApplicationResource {
@PathParam("id") String id,
ScheduleComponentsEntity scheduleComponentsEntity) {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// ensure the same id is being used
if (!id.equals(scheduleComponentsEntity.getId())) {
@@ -753,7 +751,7 @@ public class FlowResource extends ApplicationResource {
}
)
public Response searchFlow(@QueryParam("q") @DefaultValue(StringUtils.EMPTY) String value) throws InterruptedException {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// query the controller
final SearchResultsDTO results = serviceFacade.searchController(value);
@@ -796,7 +794,7 @@ public class FlowResource extends ApplicationResource {
)
public Response getControllerStatus() throws InterruptedException {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
@@ -841,7 +839,7 @@ public class FlowResource extends ApplicationResource {
)
public Response getBanners() {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// get the banner from the properties - will come from the NCM when clustered
final String bannerText = getProperties().getBannerText();
@@ -888,7 +886,7 @@ public class FlowResource extends ApplicationResource {
}
)
public Response getProcessorTypes() throws InterruptedException {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// create response entity
final ProcessorTypesEntity entity = new ProcessorTypesEntity();
@@ -933,7 +931,7 @@ public class FlowResource extends ApplicationResource {
required = false
)
@QueryParam("serviceType") String serviceType) throws InterruptedException {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// create response entity
final ControllerServiceTypesEntity entity = new ControllerServiceTypesEntity();
@@ -972,7 +970,7 @@ public class FlowResource extends ApplicationResource {
}
)
public Response getReportingTaskTypes() throws InterruptedException {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// create response entity
final ReportingTaskTypesEntity entity = new ReportingTaskTypesEntity();
@@ -1011,7 +1009,7 @@ public class FlowResource extends ApplicationResource {
}
)
public Response getPrioritizers() throws InterruptedException {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// create response entity
final PrioritizerTypesEntity entity = new PrioritizerTypesEntity();
@@ -1049,7 +1047,7 @@ public class FlowResource extends ApplicationResource {
}
)
public Response getAboutInfo() {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// create the about dto
final AboutDTO aboutDTO = new AboutDTO();
@@ -1139,7 +1137,7 @@ public class FlowResource extends ApplicationResource {
)
@QueryParam("limit") IntegerParameter limit) throws InterruptedException {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// replicate if cluster manager
if (isReplicateRequest()) {
@@ -1230,7 +1228,7 @@ public class FlowResource extends ApplicationResource {
)
@PathParam("id") String id) throws InterruptedException {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// ensure a valid request
if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
@@ -1312,7 +1310,7 @@ public class FlowResource extends ApplicationResource {
)
@PathParam("id") String id) throws InterruptedException {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// ensure a valid request
if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
@@ -1394,7 +1392,7 @@ public class FlowResource extends ApplicationResource {
)
@PathParam("id") String id) throws InterruptedException {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// ensure a valid request
if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
@@ -1476,7 +1474,7 @@ public class FlowResource extends ApplicationResource {
)
@PathParam("id") String id) throws InterruptedException {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// ensure a valid request
if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
@@ -1567,7 +1565,7 @@ public class FlowResource extends ApplicationResource {
)
@PathParam("id") String groupId) throws InterruptedException {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// ensure a valid request
if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
@@ -1670,7 +1668,7 @@ public class FlowResource extends ApplicationResource {
)
@PathParam("id") String id) throws InterruptedException {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// ensure a valid request
if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
@@ -1746,7 +1744,7 @@ public class FlowResource extends ApplicationResource {
)
@PathParam("id") String id) throws InterruptedException {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// replicate if cluster manager
if (isReplicateRequest()) {
@@ -1801,7 +1799,7 @@ public class FlowResource extends ApplicationResource {
)
@PathParam("id") String groupId) throws InterruptedException {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// replicate if cluster manager
if (isReplicateRequest()) {
@@ -1856,7 +1854,7 @@ public class FlowResource extends ApplicationResource {
)
@PathParam("id") String id) throws InterruptedException {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// replicate if cluster manager
if (isReplicateRequest()) {
@@ -1911,7 +1909,7 @@ public class FlowResource extends ApplicationResource {
)
@PathParam("id") String id) throws InterruptedException {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// replicate if cluster manager
if (isReplicateRequest()) {
@@ -2022,7 +2020,7 @@ public class FlowResource extends ApplicationResource {
)
@QueryParam("sourceId") String sourceId) {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// ensure the page is specified
if (offset == null) {
@@ -2130,7 +2128,7 @@ public class FlowResource extends ApplicationResource {
)
@PathParam("id") IntegerParameter id) {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// ensure the id was specified
if (id == null) {
@@ -2188,7 +2186,7 @@ public class FlowResource extends ApplicationResource {
)
@PathParam("componentId") final String componentId) {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
@@ -2240,7 +2238,7 @@ public class FlowResource extends ApplicationResource {
}
// authorize access
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// get all the templates
final Set<TemplateEntity> templates = serviceFacade.getTemplates();
@@ -2295,7 +2293,7 @@ public class FlowResource extends ApplicationResource {
)
@QueryParam("q") @DefaultValue(StringUtils.EMPTY) String value) {
- authorizeFlow(RequestAction.READ);
+ authorizeFlow();
// ensure connected to the cluster
if (!isConnectedToCluster()) {