You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/07/12 21:00:10 UTC

[7/9] nifi git commit: NIFI-2095: - Adding a page for managing users and groups. - Adding a page for managing access policies. - Renaming accessPolicy in entity to permissions to avoid confusion with the accessPolicy model. - Adding an Authorizable for a

http://git-wip-us.apache.org/repos/asf/nifi/blob/e0c96794/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/AccessPolicyResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/AccessPolicyResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/AccessPolicyResource.java
index dd537cd..2344a5c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/AccessPolicyResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/AccessPolicyResource.java
@@ -98,6 +98,74 @@ public class AccessPolicyResource extends ApplicationResource {
         return accessPolicy;
     }
 
+    // -----------------
+    // get access policy
+    // -----------------
+
+    /**
+     * Retrieves the specified access policy.
+     *
+     * @return An accessPolicyEntity.
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{action}/{resource: .+}")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @ApiOperation(
+            value = "Gets an access policy",
+            response = AccessPolicyEntity.class,
+            authorizations = {
+                    @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+                    @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+                    @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+            }
+    )
+    public Response getAccessPolicyForResource(
+            @ApiParam(
+                    value = "The request action.",
+                    allowableValues = "read, write",
+                    required = true
+            ) @PathParam("action") final String action,
+            @ApiParam(
+                    value = "The resource of the policy.",
+                    required = true
+            ) @PathParam("resource") String rawResource) {
+
+        // parse the action and resource type
+        final RequestAction requestAction = RequestAction.valueOfValue(action);
+        final String resource = "/" + rawResource;
+
+        if (isReplicateRequest()) {
+            return replicate(HttpMethod.GET);
+        }
+
+        // authorize access
+        serviceFacade.authorizeAccess(lookup -> {
+            final Authorizable accessPolicy = lookup.getAccessPolicyByResource(resource);
+            accessPolicy.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+        });
+
+        // get the access policy
+        final AccessPolicyEntity entity = serviceFacade.getAccessPolicy(requestAction, resource);
+        populateRemainingAccessPolicyEntityContent(entity);
+
+        return clusterContext(generateOkResponse(entity)).build();
+    }
+
+    // -----------------------
+    // manage an access policy
+    // -----------------------
+
     /**
      * Creates a new access policy.
      *
@@ -111,58 +179,6 @@ public class AccessPolicyResource extends ApplicationResource {
     // TODO - @PreAuthorize("hasRole('ROLE_DFM')")
     @ApiOperation(
             value = "Creates an access policy",
-            notes = "    Available resources:\n" +
-                    "        /flow                       - READ - allows user/entity to load the UI and see the flow structure\n" +
-                    "                                    - WRITE - NA\n" +
-                    "        /resource                   - READ - allows user/entity to retrieve the available resources\n" +
-                    "                                    - WRITE - NA\n" +
-                    "        /system                     - READ - allows user/entity to retrieve system level diagnostics (CPU load, disk utilization, etc)\n" +
-                    "                                    - WRITE - NA\n" +
-                    "        /controller                 - READ - allows user/entity to retrieve configuration details for the controller (controller bulletins, thread pool, reporting tasks, etc)\n" +
-                    "                                    - WRITE - allows user/entity to modify configuration details for the controller\n" +
-                    "        /provenance                 - READ - allows user/entity to perform provenance requests. results will be filtered based on access to provenance data per component\n" +
-                    "                                    - WRITE - NA\n" +
-                    "        /token                      - READ - NA\n" +
-                    "                                    - WRITE - allows user/entity to create a token for access the REST API\n" +
-                    "        /site-to-site               - READ - allows user/entity to retrieve configuration details for performing site to site data transfers with this NiFi\n" +
-                    "                                    - WRITE - NA\n" +
-                    "        /proxy                      - READ - NA\n" +
-                    "                                    - WRITE - allows user/entity to create a proxy request on behalf of another user\n" +
-                    "        /process-groups/{id}        - READ - allows user/entity to retrieve configuration details for the process group and all descendant components without explicit " +
-                    "access policies\n" +
-                    "                                    - WRITE - allows user/entity to create/update/delete configuration details for the process group and all descendant components without " +
-                    "explicit access policies\n" +
-                    "        /processors/{id}            - READ - allows user/entity to retrieve configuration details for the processor overriding any inherited authorizations from an ancestor " +
-                    "process group\n" +
-                    "                                    - WRITE - allows user/entity to update/delete the processor overriding any inherited authorizations from an ancestor process group\n" +
-                    "        /input-ports/{id}           - READ - allows user/entity to retrieve configuration details for the input port overriding any inherited authorizations from an ancestor " +
-                    "process group\n" +
-                    "                                    - WRITE - allows user/entity to update/delete the input port overriding any inherited authorizations from an ancestor process group\n" +
-                    "        /output-ports/{id}          - READ - allows user/entity to retrieve configuration details for the output port overriding any inherited authorizations from an ancestor " +
-                    "process group\n" +
-                    "                                    - WRITE - allows user/entity to update/delete the output port overriding any inherited authorizations from an ancestor process group\n" +
-                    "        /labels/{id}                - READ - allows user/entity to retrieve configuration details for the label overriding any inherited authorizations from an ancestor " +
-                    "process group\n" +
-                    "                                    - WRITE - allows user/entity to update/delete the label overriding any inherited authorizations from an ancestor process group\n" +
-                    "        /connections/{id}           - READ - allows user/entity to retrieve configuration details for the connection overriding any inherited authorizations from an ancestor " +
-                    "process group\n" +
-                    "                                    - WRITE - allows user/entity to update/delete the label overriding any inherited authorizations from an ancestor process group\n" +
-                    "        /remote-process-groups/{id} - READ - allows user/entity to retrieve configuration details for the remote process group overriding any inherited authorizations from an " +
-                    "ancestor process group\n" +
-                    "                                    - WRITE - allows user/entity to update/delete the remote process group overriding any inherited authorizations from an ancestor process " +
-                    "group\n" +
-                    "        /templates/{id}             - READ - allows user/entity to retrieve configuration details for the template overriding any inherited authorizations from an ancestor " +
-                    "process group\n" +
-                    "                                    - WRITE - allows user/entity to create/update/delete the template overriding any inherited authorizations from an ancestor process group\n" +
-                    "        /controller-services/{id}   - READ - allows user/entity to retrieve configuration details for the controller service overriding any inherited authorizations from an " +
-                    "ancestor process group\n" +
-                    "                                    - WRITE - allows user/entity to update/delete the controller service overriding any inherited authorizations from an ancestor process " +
-                    "group\n" +
-                    "        /reporting-tasks/{id}       - READ - allows user/entity to retrieve configuration details for the reporting tasks overriding any inherited authorizations from the " +
-                    "controller\n" +
-                    "                                    - WRITE - allows user/entity to create/update/delete the reporting tasks overriding any inherited authorizations from the controller\n" +
-                    "        /{type}/{id}/provenance     - READ - allows user/entity to view provenance data from the underlying component\n" +
-                    "                                    - WRITE - NA\n",
             response = AccessPolicyEntity.class,
             authorizations = {
                     @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
@@ -192,10 +208,18 @@ public class AccessPolicyResource extends ApplicationResource {
             throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Policy.");
         }
 
-        if (accessPolicyEntity.getComponent().getId() != null) {
+        final AccessPolicyDTO requestAccessPolicy = accessPolicyEntity.getComponent();
+        if (requestAccessPolicy.getId() != null) {
             throw new IllegalArgumentException("Access policy ID cannot be specified.");
         }
 
+        if (requestAccessPolicy.getResource() == null) {
+            throw new IllegalArgumentException("Access policy resource must be specified.");
+        }
+
+        // ensure this is a valid action
+        RequestAction.valueOfValue(requestAccessPolicy.getAction());
+
         if (isReplicateRequest()) {
             return replicate(HttpMethod.POST, accessPolicyEntity);
         }
@@ -205,7 +229,7 @@ public class AccessPolicyResource extends ApplicationResource {
         if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
             // authorize access
             serviceFacade.authorizeAccess(lookup -> {
-                final Authorizable accessPolicies = lookup.getAccessPoliciesAuthorizable();
+                final Authorizable accessPolicies = lookup.getAccessPolicyByResource(requestAccessPolicy.getResource());
                 accessPolicies.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
             });
         }
@@ -270,8 +294,8 @@ public class AccessPolicyResource extends ApplicationResource {
 
         // authorize access
         serviceFacade.authorizeAccess(lookup -> {
-            final Authorizable accessPolicy = lookup.getAccessPolicyAuthorizable(id);
-            accessPolicy.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+            Authorizable authorizable  = lookup.getAccessPolicyById(id);
+            authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
         });
 
         // get the access policy
@@ -347,8 +371,8 @@ public class AccessPolicyResource extends ApplicationResource {
                 serviceFacade,
                 revision,
                 lookup -> {
-                    Authorizable authorizable  = lookup.getAccessPolicyAuthorizable(id);
-                authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+                    Authorizable authorizable  = lookup.getAccessPolicyById(id);
+                    authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
                 },
                 null,
                 () -> {
@@ -422,8 +446,8 @@ public class AccessPolicyResource extends ApplicationResource {
                 serviceFacade,
                 revision,
                 lookup -> {
-                    final Authorizable accessPolicy = lookup.getAccessPolicyAuthorizable(id);
-                accessPolicy.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
+                    final Authorizable accessPolicy = lookup.getAccessPolicyById(id);
+                    accessPolicy.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
                 },
                 () -> {
                 },

http://git-wip-us.apache.org/repos/asf/nifi/blob/e0c96794/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index 42f836c..faa1ab4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -16,31 +16,10 @@
  */
 package org.apache.nifi.web.api;
 
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.core.CacheControl;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.ResponseBuilder;
-import javax.ws.rs.core.UriBuilder;
-import javax.ws.rs.core.UriBuilderException;
-import javax.ws.rs.core.UriInfo;
-
+import com.sun.jersey.api.core.HttpContext;
+import com.sun.jersey.api.representation.Form;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import com.sun.jersey.server.impl.model.method.dispatch.FormDispatchProvider;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.Authorizer;
 import org.apache.nifi.authorization.RequestAction;
@@ -54,6 +33,13 @@ import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
 import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.controller.Snippet;
+import org.apache.nifi.remote.HttpRemoteSiteListener;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.exception.BadRequestException;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.NotAuthorizedException;
+import org.apache.nifi.remote.protocol.ResponseCode;
+import org.apache.nifi.remote.protocol.http.HttpHeaders;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.AuthorizableLookup;
 import org.apache.nifi.web.AuthorizeAccess;
@@ -62,14 +48,40 @@ import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.api.dto.RevisionDTO;
 import org.apache.nifi.web.api.dto.SnippetDTO;
 import org.apache.nifi.web.api.entity.ComponentEntity;
+import org.apache.nifi.web.api.entity.TransactionResultEntity;
 import org.apache.nifi.web.api.request.ClientIdParameter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.sun.jersey.api.core.HttpContext;
-import com.sun.jersey.api.representation.Form;
-import com.sun.jersey.core.util.MultivaluedMapImpl;
-import com.sun.jersey.server.impl.model.method.dispatch.FormDispatchProvider;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.CacheControl;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriBuilderException;
+import javax.ws.rs.core.UriInfo;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static javax.ws.rs.core.Response.Status.NOT_FOUND;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_NAME;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE;
 
 /**
  * Base class for controllers.
@@ -712,4 +724,152 @@ public abstract class ApplicationResource {
     public static enum ReplicationTarget {
         CLUSTER_NODES, CLUSTER_COORDINATOR;
     }
+
+    // -----------------
+    // HTTP site to site
+    // -----------------
+
+    protected Integer negotiateTransportProtocolVersion(final HttpServletRequest req, final VersionNegotiator transportProtocolVersionNegotiator) throws BadRequestException {
+        String protocolVersionStr = req.getHeader(HttpHeaders.PROTOCOL_VERSION);
+        if (isEmpty(protocolVersionStr)) {
+            throw new BadRequestException("Protocol version was not specified.");
+        }
+
+        final Integer requestedProtocolVersion;
+        try {
+            requestedProtocolVersion = Integer.valueOf(protocolVersionStr);
+        } catch (NumberFormatException e) {
+            throw new BadRequestException("Specified protocol version was not in a valid number format: " + protocolVersionStr);
+        }
+
+        Integer protocolVersion;
+        if (transportProtocolVersionNegotiator.isVersionSupported(requestedProtocolVersion)) {
+            return requestedProtocolVersion;
+        } else {
+            protocolVersion = transportProtocolVersionNegotiator.getPreferredVersion(requestedProtocolVersion);
+        }
+
+        if (protocolVersion == null) {
+            throw new BadRequestException("Specified protocol version is not supported: " + protocolVersionStr);
+        }
+        return protocolVersion;
+    }
+
+    protected Response.ResponseBuilder setCommonHeaders(final Response.ResponseBuilder builder, final Integer transportProtocolVersion, final HttpRemoteSiteListener transactionManager) {
+        return builder.header(HttpHeaders.PROTOCOL_VERSION, transportProtocolVersion)
+                .header(HttpHeaders.SERVER_SIDE_TRANSACTION_TTL, transactionManager.getTransactionTtlSec());
+    }
+
+    protected class ResponseCreator {
+
+        public Response nodeTypeErrorResponse(String errMsg) {
+            return noCache(Response.status(Response.Status.FORBIDDEN)).type(MediaType.TEXT_PLAIN).entity(errMsg).build();
+        }
+
+        public Response httpSiteToSiteIsNotEnabledResponse() {
+            return noCache(Response.status(Response.Status.FORBIDDEN)).type(MediaType.TEXT_PLAIN).entity("HTTP(S) Site-to-Site is not enabled on this host.").build();
+        }
+
+        public Response wrongPortTypeResponse(String portType, String portId) {
+            logger.debug("Port type was wrong. portType={}, portId={}", portType, portId);
+            TransactionResultEntity entity = new TransactionResultEntity();
+            entity.setResponseCode(ResponseCode.ABORT.getCode());
+            entity.setMessage("Port was not found.");
+            entity.setFlowFileSent(0);
+            return Response.status(NOT_FOUND).entity(entity).type(MediaType.APPLICATION_JSON_TYPE).build();
+        }
+
+        public Response transactionNotFoundResponse(String portId, String transactionId) {
+            logger.debug("Transaction was not found. portId={}, transactionId={}", portId, transactionId);
+            TransactionResultEntity entity = new TransactionResultEntity();
+            entity.setResponseCode(ResponseCode.ABORT.getCode());
+            entity.setMessage("Transaction was not found.");
+            entity.setFlowFileSent(0);
+            return Response.status(NOT_FOUND).entity(entity).type(MediaType.APPLICATION_JSON_TYPE).build();
+        }
+
+        public Response unexpectedErrorResponse(String portId, Exception e) {
+            logger.error("Unexpected exception occurred. portId={}", portId);
+            logger.error("Exception detail:", e);
+            TransactionResultEntity entity = new TransactionResultEntity();
+            entity.setResponseCode(ResponseCode.ABORT.getCode());
+            entity.setMessage("Server encountered an exception.");
+            entity.setFlowFileSent(0);
+            return Response.serverError().entity(entity).type(MediaType.APPLICATION_JSON_TYPE).build();
+        }
+
+        public Response unexpectedErrorResponse(String portId, String transactionId, Exception e) {
+            logger.error("Unexpected exception occurred. portId={}, transactionId={}", portId, transactionId);
+            logger.error("Exception detail:", e);
+            TransactionResultEntity entity = new TransactionResultEntity();
+            entity.setResponseCode(ResponseCode.ABORT.getCode());
+            entity.setMessage("Server encountered an exception.");
+            entity.setFlowFileSent(0);
+            return Response.serverError().entity(entity).type(MediaType.APPLICATION_JSON_TYPE).build();
+        }
+
+        public Response unauthorizedResponse(NotAuthorizedException e) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Client request was not authorized. {}", e.getMessage());
+            }
+            TransactionResultEntity entity = new TransactionResultEntity();
+            entity.setResponseCode(ResponseCode.UNAUTHORIZED.getCode());
+            entity.setMessage(e.getMessage());
+            entity.setFlowFileSent(0);
+            return Response.status(Response.Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON_TYPE).entity(e.getMessage()).build();
+        }
+
+        public Response badRequestResponse(Exception e) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Client sent a bad request. {}", e.getMessage());
+            }
+            TransactionResultEntity entity = new TransactionResultEntity();
+            entity.setResponseCode(ResponseCode.ABORT.getCode());
+            entity.setMessage(e.getMessage());
+            entity.setFlowFileSent(0);
+            return Response.status(Response.Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON_TYPE).entity(entity).build();
+        }
+
+        public Response handshakeExceptionResponse(HandshakeException e) {
+            if(logger.isDebugEnabled()){
+                logger.debug("Handshake failed, {}", e.getMessage());
+            }
+            ResponseCode handshakeRes = e.getResponseCode();
+            Response.Status statusCd;
+            TransactionResultEntity entity = new TransactionResultEntity();
+            entity.setResponseCode(handshakeRes != null ? handshakeRes.getCode() : ResponseCode.ABORT.getCode());
+            entity.setMessage(e.getMessage());
+            entity.setFlowFileSent(0);
+            switch (handshakeRes) {
+                case PORT_NOT_IN_VALID_STATE:
+                case PORTS_DESTINATION_FULL:
+                    return Response.status(Response.Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON_TYPE).entity(entity).build();
+                case UNAUTHORIZED:
+                    statusCd = Response.Status.UNAUTHORIZED;
+                    break;
+                case UNKNOWN_PORT:
+                    statusCd = NOT_FOUND;
+                    break;
+                default:
+                    statusCd = Response.Status.BAD_REQUEST;
+            }
+            return Response.status(statusCd).type(MediaType.APPLICATION_JSON_TYPE).entity(entity).build();
+        }
+
+        public Response acceptedResponse(final HttpRemoteSiteListener transactionManager, final Object entity, final Integer protocolVersion) {
+            return noCache(setCommonHeaders(Response.status(Response.Status.ACCEPTED), protocolVersion, transactionManager))
+                    .entity(entity).build();
+        }
+
+        public Response locationResponse(UriInfo uriInfo, String portType, String portId, String transactionId, Object entity,
+                                         Integer protocolVersion, final HttpRemoteSiteListener transactionManager) {
+
+            String path = "/data-transfer/" + portType + "/" + portId + "/transactions/" + transactionId;
+            URI location = uriInfo.getBaseUriBuilder().path(path).build();
+            return noCache(setCommonHeaders(Response.created(location), protocolVersion, transactionManager)
+                    .header(LOCATION_URI_INTENT_NAME, LOCATION_URI_INTENT_VALUE))
+                    .entity(entity).build();
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e0c96794/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
new file mode 100644
index 0000000..aad8b4a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
@@ -0,0 +1,837 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api;
+
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.authorization.AccessDeniedException;
+import org.apache.nifi.authorization.AuthorizationRequest;
+import org.apache.nifi.authorization.AuthorizationResult;
+import org.apache.nifi.authorization.AuthorizationResult.Result;
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.RequestAction;
+import org.apache.nifi.authorization.Resource;
+import org.apache.nifi.authorization.resource.ResourceFactory;
+import org.apache.nifi.authorization.resource.ResourceType;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.authorization.user.NiFiUserUtils;
+import org.apache.nifi.remote.HttpRemoteSiteListener;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerDescription;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator;
+import org.apache.nifi.remote.exception.BadRequestException;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.NotAuthorizedException;
+import org.apache.nifi.remote.exception.RequestExpiredException;
+import org.apache.nifi.remote.io.http.HttpOutput;
+import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession;
+import org.apache.nifi.remote.protocol.HandshakeProperty;
+import org.apache.nifi.remote.protocol.ResponseCode;
+import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol;
+import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocolImpl;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.web.api.entity.TransactionResultEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+import javax.ws.rs.core.UriInfo;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_COUNT;
+import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_DURATION;
+import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_SIZE;
+import static org.apache.nifi.remote.protocol.HandshakeProperty.REQUEST_EXPIRATION_MILLIS;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_COUNT;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_DURATION;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_BATCH_SIZE;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_REQUEST_EXPIRATION;
+import static org.apache.nifi.remote.protocol.http.HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION;
+
+/**
+ * RESTful endpoint for managing a SiteToSite connection.
+ */
+@Path("/data-transfer")
+@Api(
+        value = "/data-transfer",
+        description = "Supports data transfers with this NiFi using HTTP based site to site"
+)
+public class DataTransferResource extends ApplicationResource {
+
+    private static final Logger logger = LoggerFactory.getLogger(DataTransferResource.class);
+
+    public static final String CHECK_SUM = "checksum";
+    public static final String RESPONSE_CODE = "responseCode";
+
+
+    private static final String PORT_TYPE_INPUT = "input-ports";
+    private static final String PORT_TYPE_OUTPUT = "output-ports";
+
+    private Authorizer authorizer;
+    private final ResponseCreator responseCreator = new ResponseCreator();
+    private final VersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1);
+    private final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance();
+
+    /**
+     * Authorizes access to data transfers.
+     *
+     * Note: Protected for testing purposes
+     */
+    protected void authorizeDataTransfer(final ResourceType resourceType, final String identifier) {
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+        if (!ResourceType.InputPort.equals(resourceType) && !ResourceType.OutputPort.equals(resourceType)) {
+            throw new IllegalArgumentException("The resource must be an Input or Output Port.");
+        }
+
+        // TODO - use DataTransferAuthorizable after looking up underlying component for consistentency
+        final Resource resource = ResourceFactory.getComponentResource(resourceType, identifier, identifier);
+        final AuthorizationRequest request = new AuthorizationRequest.Builder()
+                .resource(ResourceFactory.getDataTransferResource(resource))
+                .identity(user.getIdentity())
+                .anonymous(user.isAnonymous())
+                .accessAttempt(true)
+                .action(RequestAction.WRITE)
+                .build();
+
+        final AuthorizationResult result = authorizer.authorize(request);
+        if (!Result.Approved.equals(result.getResult())) {
+            final String message = StringUtils.isNotBlank(result.getExplanation()) ? result.getExplanation() : "Access is denied";
+            throw new AccessDeniedException(message);
+        }
+    }
+
+    @POST
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("{portType}/{portId}/transactions")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @ApiOperation(
+            value = "Create a transaction to the specified output port or input port",
+            response = TransactionResultEntity.class,
+            authorizations = {
+                    @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+                    @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+                    @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."),
+                    @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful"),
+            }
+    )
+    public Response createPortTransaction(
+            @ApiParam(
+                    value = "The port type.",
+                    required = true,
+                    allowableValues = "input-ports, output-ports"
+            )
+            @PathParam("portType") String portType,
+            @PathParam("portId") String portId,
+            @Context HttpServletRequest req,
+            @Context ServletContext context,
+            @Context UriInfo uriInfo,
+            InputStream inputStream) {
+
+
+        if(!PORT_TYPE_INPUT.equals(portType) && !PORT_TYPE_OUTPUT.equals(portType)){
+            return responseCreator.wrongPortTypeResponse(portType, portId);
+        }
+
+        // authorize access
+        authorizeDataTransfer(PORT_TYPE_INPUT.equals(portType) ? ResourceType.InputPort : ResourceType.OutputPort, portId);
+
+        final ValidateRequestResult validationResult = validateResult(req, portId);
+        if (validationResult.errResponse != null) {
+            return validationResult.errResponse;
+        }
+
+        logger.debug("createPortTransaction request: clientId={}, portType={}, portId={}", portType, portId);
+
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        final String transactionId = transactionManager.createTransaction();
+        final Peer peer = constructPeer(req, inputStream, out, portId, transactionId);
+        final int transportProtocolVersion = validationResult.transportProtocolVersion;
+
+        try {
+            // Execute handshake.
+            initiateServerProtocol(peer, transportProtocolVersion);
+
+            TransactionResultEntity entity = new TransactionResultEntity();
+            entity.setResponseCode(ResponseCode.PROPERTIES_OK.getCode());
+            entity.setMessage("Handshake properties are valid, and port is running. A transaction is created:" + transactionId);
+
+            return responseCreator.locationResponse(uriInfo, portType, portId, transactionId, entity, transportProtocolVersion, transactionManager);
+
+        } catch (HandshakeException e) {
+            transactionManager.cancelTransaction(transactionId);
+            return responseCreator.handshakeExceptionResponse(e);
+
+        } catch (Exception e) {
+            transactionManager.cancelTransaction(transactionId);
+            return responseCreator.unexpectedErrorResponse(portId, e);
+        }
+    }
+
+    @POST
+    @Consumes(MediaType.APPLICATION_OCTET_STREAM)
+    @Produces(MediaType.TEXT_PLAIN)
+    @Path("input-ports/{portId}/transactions/{transactionId}/flow-files")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @ApiOperation(
+            value = "Transfer flow files to the input port",
+            response = String.class,
+            authorizations = {
+                    @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+                    @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+                    @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."),
+                    @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful"),
+            }
+    )
+    public Response receiveFlowFiles(
+            @ApiParam(
+                    value = "The input port id.",
+                    required = true
+            )
+            @PathParam("portId") String portId,
+            @PathParam("transactionId") String transactionId,
+            @Context HttpServletRequest req,
+            @Context ServletContext context,
+            InputStream inputStream) {
+
+        // authorize access
+        authorizeDataTransfer(ResourceType.InputPort, portId);
+
+        final ValidateRequestResult validationResult = validateResult(req, portId, transactionId);
+        if (validationResult.errResponse != null) {
+            return validationResult.errResponse;
+        }
+
+        logger.debug("receiveFlowFiles request: portId={}", portId);
+
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        final Peer peer = constructPeer(req, inputStream, out, portId, transactionId);
+        final int transportProtocolVersion = validationResult.transportProtocolVersion;
+
+        try {
+            HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(peer, transportProtocolVersion);
+            int numOfFlowFiles = serverProtocol.getPort().receiveFlowFiles(peer, serverProtocol);
+            logger.debug("finished receiving flow files, numOfFlowFiles={}", numOfFlowFiles);
+            if (numOfFlowFiles < 1) {
+                return Response.status(Response.Status.BAD_REQUEST)
+                        .entity("Client should send request when there is data to send. There was no flow file sent.").build();
+            }
+        } catch (HandshakeException e) {
+            return responseCreator.handshakeExceptionResponse(e);
+
+        } catch (NotAuthorizedException e) {
+            return responseCreator.unauthorizedResponse(e);
+
+        } catch (BadRequestException | RequestExpiredException e) {
+            return responseCreator.badRequestResponse(e);
+
+        } catch (Exception e) {
+            return responseCreator.unexpectedErrorResponse(portId, e);
+        }
+
+        String serverChecksum = ((HttpServerCommunicationsSession)peer.getCommunicationsSession()).getChecksum();
+        return responseCreator.acceptedResponse(transactionManager, serverChecksum, transportProtocolVersion);
+    }
+
+    private HttpFlowFileServerProtocol initiateServerProtocol(Peer peer, Integer transportProtocolVersion) throws IOException {
+        // Switch transaction protocol version based on transport protocol version.
+        TransportProtocolVersionNegotiator negotiatedTransportProtocolVersion = new TransportProtocolVersionNegotiator(transportProtocolVersion);
+        VersionNegotiator versionNegotiator = new StandardVersionNegotiator(negotiatedTransportProtocolVersion.getTransactionProtocolVersion());
+        HttpFlowFileServerProtocol serverProtocol = getHttpFlowFileServerProtocol(versionNegotiator);
+        HttpRemoteSiteListener.getInstance().setupServerProtocol(serverProtocol);
+        // TODO: How should I pass cluster information?
+        // serverProtocol.setNodeInformant(clusterManager);
+        serverProtocol.handshake(peer);
+        return serverProtocol;
+    }
+
+    HttpFlowFileServerProtocol getHttpFlowFileServerProtocol(VersionNegotiator versionNegotiator) {
+        return new HttpFlowFileServerProtocolImpl(versionNegotiator);
+    }
+
+    private Peer constructPeer(HttpServletRequest req, InputStream inputStream, OutputStream outputStream, String portId, String transactionId) {
+        String clientHostName = req.getRemoteHost();
+        int clientPort = req.getRemotePort();
+
+        PeerDescription peerDescription = new PeerDescription(clientHostName, clientPort, req.isSecure());
+
+        HttpServerCommunicationsSession commSession = new HttpServerCommunicationsSession(inputStream, outputStream, transactionId);
+
+        boolean useCompression = false;
+        final String useCompressionStr = req.getHeader(HANDSHAKE_PROPERTY_USE_COMPRESSION);
+        if (!isEmpty(useCompressionStr) && Boolean.valueOf(useCompressionStr)) {
+            useCompression = true;
+        }
+
+        final String requestExpiration = req.getHeader(HANDSHAKE_PROPERTY_REQUEST_EXPIRATION);
+        final String batchCount = req.getHeader(HANDSHAKE_PROPERTY_BATCH_COUNT);
+        final String batchSize = req.getHeader(HANDSHAKE_PROPERTY_BATCH_SIZE);
+        final String batchDuration = req.getHeader(HANDSHAKE_PROPERTY_BATCH_DURATION);
+
+        commSession.putHandshakeParam(HandshakeProperty.PORT_IDENTIFIER, portId);
+        commSession.putHandshakeParam(HandshakeProperty.GZIP, String.valueOf(useCompression));
+
+        if (!isEmpty(requestExpiration)) commSession.putHandshakeParam(REQUEST_EXPIRATION_MILLIS, requestExpiration);
+        if (!isEmpty(batchCount)) commSession.putHandshakeParam(BATCH_COUNT, batchCount);
+        if (!isEmpty(batchSize)) commSession.putHandshakeParam(BATCH_SIZE, batchSize);
+        if (!isEmpty(batchDuration)) commSession.putHandshakeParam(BATCH_DURATION, batchDuration);
+
+        if(peerDescription.isSecure()){
+            NiFiUser nifiUser = NiFiUserUtils.getNiFiUser();
+            logger.debug("initiating peer, nifiUser={}", nifiUser);
+            commSession.setUserDn(nifiUser.getIdentity());
+        }
+
+        // TODO: Followed how SocketRemoteSiteListener define peerUrl and clusterUrl, but it can be more meaningful values, especially for clusterUrl.
+        String peerUrl = "nifi://" + clientHostName + ":" + clientPort;
+        String clusterUrl = "nifi://localhost:" + req.getLocalPort();
+        return new Peer(peerDescription, commSession, peerUrl, clusterUrl);
+    }
+
+    @DELETE
+    @Consumes(MediaType.APPLICATION_OCTET_STREAM)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("output-ports/{portId}/transactions/{transactionId}")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @ApiOperation(
+            value = "Commit or cancel the specified transaction",
+            response = TransactionResultEntity.class,
+            authorizations = {
+                    @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+                    @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+                    @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."),
+                    @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful"),
+            }
+    )
+    public Response commitOutputPortTransaction(
+            @ApiParam(
+                    value = "The response code. Available values are CONFIRM_TRANSACTION(12) or CANCEL_TRANSACTION(15).",
+                    required = true
+            )
+            @QueryParam(RESPONSE_CODE) Integer responseCode,
+            @ApiParam(
+                    value = "A checksum calculated at client side using CRC32 to check flow file content integrity. It must match with the value calculated at server side.",
+                    required = true
+            )
+            @QueryParam(CHECK_SUM) @DefaultValue(StringUtils.EMPTY) String checksum,
+            @ApiParam(
+                    value = "The output port id.",
+                    required = true
+            )
+            @PathParam("portId") String portId,
+            @ApiParam(
+                    value = "The transaction id.",
+                    required = true
+            )
+            @PathParam("transactionId") String transactionId,
+            @Context HttpServletRequest req,
+            @Context ServletContext context,
+            InputStream inputStream) {
+
+        // authorize access
+        authorizeDataTransfer(ResourceType.OutputPort, portId);
+
+        final ValidateRequestResult validationResult = validateResult(req, portId, transactionId);
+        if (validationResult.errResponse != null) {
+            return validationResult.errResponse;
+        }
+
+        logger.debug("commitOutputPortTransaction request: portId={}, transactionId={}", portId, transactionId);
+
+        final int transportProtocolVersion = validationResult.transportProtocolVersion;
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        final Peer peer = constructPeer(req, inputStream, out, portId, transactionId);
+
+        final TransactionResultEntity entity = new TransactionResultEntity();
+        try {
+            HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(peer, transportProtocolVersion);
+
+            String inputErrMessage = null;
+            if (responseCode == null) {
+                inputErrMessage = "responseCode is required.";
+            } else if(ResponseCode.CONFIRM_TRANSACTION.getCode() != responseCode
+                    && ResponseCode.CANCEL_TRANSACTION.getCode() != responseCode) {
+                inputErrMessage = "responseCode " + responseCode + " is invalid. ";
+            }
+
+            if (inputErrMessage != null){
+                entity.setMessage(inputErrMessage);
+                entity.setResponseCode(ResponseCode.ABORT.getCode());
+                return Response.status(Response.Status.BAD_REQUEST).entity(entity).build();
+            }
+
+            if (ResponseCode.CANCEL_TRANSACTION.getCode() == responseCode) {
+                return cancelTransaction(transactionId, entity);
+            }
+
+            int flowFileSent = serverProtocol.commitTransferTransaction(peer, checksum);
+            entity.setResponseCode(ResponseCode.CONFIRM_TRANSACTION.getCode());
+            entity.setFlowFileSent(flowFileSent);
+
+        } catch (HandshakeException e) {
+            return responseCreator.handshakeExceptionResponse(e);
+
+        } catch (Exception e) {
+            HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
+            logger.error("Failed to process the request", e);
+            if(ResponseCode.BAD_CHECKSUM.equals(commsSession.getResponseCode())){
+                entity.setResponseCode(commsSession.getResponseCode().getCode());
+                entity.setMessage(e.getMessage());
+
+                Response.ResponseBuilder builder = Response.status(Response.Status.BAD_REQUEST).entity(entity);
+                return clusterContext(noCache(builder)).build();
+            }
+
+            return responseCreator.unexpectedErrorResponse(portId, transactionId, e);
+        }
+
+        return clusterContext(noCache(setCommonHeaders(Response.ok(entity), transportProtocolVersion, transactionManager))).build();
+    }
+
+
+    @DELETE
+    @Consumes(MediaType.APPLICATION_OCTET_STREAM)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("input-ports/{portId}/transactions/{transactionId}")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @ApiOperation(
+            value = "Commit or cancel the specified transaction",
+            response = TransactionResultEntity.class,
+            authorizations = {
+                    @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+                    @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+                    @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."),
+                    @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful"),
+            }
+    )
+    public Response commitInputPortTransaction(
+            @ApiParam(
+                    value = "The response code. Available values are BAD_CHECKSUM(19), CONFIRM_TRANSACTION(12) or CANCEL_TRANSACTION(15).",
+                    required = true
+            )
+            @QueryParam(RESPONSE_CODE) Integer responseCode,
+            @ApiParam(
+                    value = "The input port id.",
+                    required = true
+            )
+            @PathParam("portId") String portId,
+            @ApiParam(
+                    value = "The transaction id.",
+                    required = true
+            )
+            @PathParam("transactionId") String transactionId,
+            @Context HttpServletRequest req,
+            @Context ServletContext context,
+            InputStream inputStream) {
+
+        // authorize access
+        authorizeDataTransfer(ResourceType.InputPort, portId);
+
+        final ValidateRequestResult validationResult = validateResult(req, portId, transactionId);
+        if (validationResult.errResponse != null) {
+            return validationResult.errResponse;
+        }
+
+        logger.debug("commitInputPortTransaction request: portId={}, transactionId={}, responseCode={}",
+                portId, transactionId, responseCode);
+
+        final int transportProtocolVersion = validationResult.transportProtocolVersion;
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        final Peer peer = constructPeer(req, inputStream, out, portId, transactionId);
+
+        final TransactionResultEntity entity = new TransactionResultEntity();
+        try {
+            HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(peer, transportProtocolVersion);
+            HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
+            // Pass the response code sent from the client.
+            String inputErrMessage = null;
+            if (responseCode == null) {
+                inputErrMessage = "responseCode is required.";
+            } else if(ResponseCode.BAD_CHECKSUM.getCode() != responseCode
+                    && ResponseCode.CONFIRM_TRANSACTION.getCode() != responseCode
+                    && ResponseCode.CANCEL_TRANSACTION.getCode() != responseCode) {
+                inputErrMessage = "responseCode " + responseCode + " is invalid. ";
+            }
+
+            if (inputErrMessage != null){
+                entity.setMessage(inputErrMessage);
+                entity.setResponseCode(ResponseCode.ABORT.getCode());
+                return Response.status(Response.Status.BAD_REQUEST).entity(entity).build();
+            }
+
+            if (ResponseCode.CANCEL_TRANSACTION.getCode() == responseCode) {
+                return cancelTransaction(transactionId, entity);
+            }
+
+            commsSession.setResponseCode(ResponseCode.fromCode(responseCode));
+
+            try {
+                int flowFileSent = serverProtocol.commitReceiveTransaction(peer);
+                entity.setResponseCode(commsSession.getResponseCode().getCode());
+                entity.setFlowFileSent(flowFileSent);
+
+            } catch (IOException e){
+                if (ResponseCode.BAD_CHECKSUM.getCode() == responseCode && e.getMessage().contains("Received a BadChecksum response")){
+                    // AbstractFlowFileServerProtocol throws IOException after it canceled transaction.
+                    // This is a known behavior and if we return 500 with this exception,
+                    // it's not clear if there is an issue at server side, or cancel operation has been accomplished.
+                    // Above conditions can guarantee this is the latter case, we return 200 OK here.
+                    entity.setResponseCode(ResponseCode.CANCEL_TRANSACTION.getCode());
+                    return clusterContext(noCache(Response.ok(entity))).build();
+                } else {
+                    return responseCreator.unexpectedErrorResponse(portId, transactionId, e);
+                }
+            }
+
+        } catch (HandshakeException e) {
+            return responseCreator.handshakeExceptionResponse(e);
+
+        } catch (Exception e) {
+            return responseCreator.unexpectedErrorResponse(portId, transactionId, e);
+        }
+
+        return clusterContext(noCache(setCommonHeaders(Response.ok(entity), transportProtocolVersion, transactionManager))).build();
+    }
+
+    private Response cancelTransaction(String transactionId, TransactionResultEntity entity) {
+        transactionManager.cancelTransaction(transactionId);
+        entity.setMessage("Transaction has been canceled.");
+        entity.setResponseCode(ResponseCode.CANCEL_TRANSACTION.getCode());
+        return Response.ok(entity).build();
+    }
+
+
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_OCTET_STREAM)
+    @Path("output-ports/{portId}/transactions/{transactionId}/flow-files")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @ApiOperation(
+            value = "Transfer flow files from the output port",
+            response = StreamingOutput.class,
+            authorizations = {
+                    @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+                    @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+                    @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 200, message = "There is no flow file to return."),
+                    @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."),
+                    @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful"),
+            }
+    )
+    public Response transferFlowFiles(
+            @ApiParam(
+                    value = "The output port id.",
+                    required = true
+            )
+            @PathParam("portId") String portId,
+            @PathParam("transactionId") String transactionId,
+            @Context HttpServletRequest req,
+            @Context HttpServletResponse res,
+            @Context ServletContext context,
+            InputStream inputStream) {
+
+        // authorize access
+        authorizeDataTransfer(ResourceType.OutputPort, portId);
+
+        final ValidateRequestResult validationResult = validateResult(req, portId, transactionId);
+        if (validationResult.errResponse != null) {
+            return validationResult.errResponse;
+        }
+
+        logger.debug("transferFlowFiles request: portId={}", portId);
+
+        // Before opening the real output stream for HTTP response,
+        // use this temporary output stream to buffer handshake result.
+        final ByteArrayOutputStream tempBos = new ByteArrayOutputStream();
+        final Peer peer = constructPeer(req, inputStream, tempBos, portId, transactionId);
+        final int transportProtocolVersion = validationResult.transportProtocolVersion;
+        try {
+            final HttpFlowFileServerProtocol serverProtocol = initiateServerProtocol(peer, transportProtocolVersion);
+
+            StreamingOutput flowFileContent = new StreamingOutput() {
+                @Override
+                public void write(OutputStream outputStream) throws IOException, WebApplicationException {
+
+                    HttpOutput output = (HttpOutput)peer.getCommunicationsSession().getOutput();
+                    output.setOutputStream(outputStream);
+
+                    try {
+                        int numOfFlowFiles = serverProtocol.getPort().transferFlowFiles(peer, serverProtocol);
+                        logger.debug("finished transferring flow files, numOfFlowFiles={}", numOfFlowFiles);
+                        if(numOfFlowFiles < 1){
+                            // There was no flow file to transfer. Throw this exception to stop responding with SEE OTHER.
+                            throw new WebApplicationException(Response.Status.OK);
+                        }
+                    } catch (NotAuthorizedException | BadRequestException | RequestExpiredException e) {
+                        // Handshake is done outside of write() method, so these exception wouldn't be thrown.
+                        throw new IOException("Failed to process the request.", e);
+                    }
+                }
+
+            };
+
+            return responseCreator.acceptedResponse(transactionManager, flowFileContent, transportProtocolVersion);
+
+        } catch (HandshakeException e) {
+            return responseCreator.handshakeExceptionResponse(e);
+
+        } catch (Exception e) {
+            return responseCreator.unexpectedErrorResponse(portId, e);
+        }
+    }
+
+    @PUT
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("input-ports/{portId}/transactions/{transactionId}")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @ApiOperation(
+            value = "Extend transaction TTL",
+            response = TransactionResultEntity.class,
+            authorizations = {
+                    @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+                    @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+                    @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+            }
+    )
+    public Response extendInputPortTransactionTTL(
+            @PathParam("portId") String portId,
+            @PathParam("transactionId") String transactionId,
+            @Context HttpServletRequest req,
+            @Context HttpServletResponse res,
+            @Context ServletContext context,
+            @Context UriInfo uriInfo,
+            InputStream inputStream) {
+
+        // authorize access
+        authorizeDataTransfer(ResourceType.InputPort, portId);
+
+        return extendPortTransactionTTL(PORT_TYPE_INPUT, portId, transactionId, req, res, context, uriInfo, inputStream);
+    }
+
+    @PUT
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("output-ports/{portId}/transactions/{transactionId}")
+    // TODO - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+    @ApiOperation(
+            value = "Extend transaction TTL",
+            response = TransactionResultEntity.class,
+            authorizations = {
+                    @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+                    @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+                    @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."),
+                    @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful"),
+            }
+    )
+    public Response extendOutputPortTransactionTTL(
+            @PathParam("portId") String portId,
+            @PathParam("transactionId") String transactionId,
+            @Context HttpServletRequest req,
+            @Context HttpServletResponse res,
+            @Context ServletContext context,
+            @Context UriInfo uriInfo,
+            InputStream inputStream) {
+
+        // authorize access
+        authorizeDataTransfer(ResourceType.OutputPort, portId);
+
+        return extendPortTransactionTTL(PORT_TYPE_OUTPUT, portId, transactionId, req, res, context, uriInfo, inputStream);
+    }
+
+    public Response extendPortTransactionTTL(
+            String portType,
+            String portId,
+            String transactionId,
+            HttpServletRequest req,
+            HttpServletResponse res,
+            ServletContext context,
+            UriInfo uriInfo,
+            InputStream inputStream) {
+
+        final ValidateRequestResult validationResult = validateResult(req, portId, transactionId);
+        if (validationResult.errResponse != null) {
+            return validationResult.errResponse;
+        }
+
+        if(!PORT_TYPE_INPUT.equals(portType) && !PORT_TYPE_OUTPUT.equals(portType)){
+            return responseCreator.wrongPortTypeResponse(portType, portId);
+        }
+
+        logger.debug("extendOutputPortTransactionTTL request: portType={}, portId={}, transactionId={}",
+                portType, portId, transactionId);
+
+        final int transportProtocolVersion = validationResult.transportProtocolVersion;
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        final Peer peer = constructPeer(req, inputStream, out, portId, transactionId);
+
+        try {
+            // Do handshake
+            initiateServerProtocol(peer, transportProtocolVersion);
+            transactionManager.extendsTransaction(transactionId);
+
+            final TransactionResultEntity entity = new TransactionResultEntity();
+            entity.setResponseCode(ResponseCode.CONTINUE_TRANSACTION.getCode());
+            entity.setMessage("Extended TTL.");
+            return clusterContext(noCache(setCommonHeaders(Response.ok(entity), transportProtocolVersion, transactionManager))).build();
+
+        } catch (HandshakeException e) {
+            return responseCreator.handshakeExceptionResponse(e);
+
+        } catch (Exception e) {
+            return responseCreator.unexpectedErrorResponse(portId, transactionId, e);
+        }
+
+    }
+
+    private class ValidateRequestResult {
+        private Integer transportProtocolVersion;
+        private Response errResponse;
+    }
+
+    private ValidateRequestResult validateResult(HttpServletRequest req, String portId) {
+        return validateResult(req, portId, null);
+    }
+
+    private ValidateRequestResult validateResult(HttpServletRequest req, String portId, String transactionId) {
+        ValidateRequestResult result = new ValidateRequestResult();
+        if(!properties.isSiteToSiteHttpEnabled()) {
+            result.errResponse = responseCreator.httpSiteToSiteIsNotEnabledResponse();
+            return result;
+        }
+
+        // TODO: NCM no longer exists.
+        /*
+        if (properties.isClusterManager()) {
+            result.errResponse = responseCreator.nodeTypeErrorResponse(req.getPathInfo() + " is not available on a NiFi Cluster Manager.");
+            return result;
+        }
+        */
+
+
+        try {
+            result.transportProtocolVersion = negotiateTransportProtocolVersion(req, transportProtocolVersionNegotiator);
+        } catch (BadRequestException e) {
+            result.errResponse = responseCreator.badRequestResponse(e);
+            return result;
+        }
+
+        if(!isEmpty(transactionId) && !transactionManager.isTransactionActive(transactionId)) {
+            result.errResponse = responseCreator.transactionNotFoundResponse(portId, transactionId);
+            return  result;
+        }
+
+        return result;
+    }
+
+
+    // setters
+
+    public void setAuthorizer(Authorizer authorizer) {
+        this.authorizer = authorizer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e0c96794/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index 4447014..b719e24 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -16,31 +16,13 @@
  */
 package org.apache.nifi.web.api;
 
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
+import com.sun.jersey.api.core.ResourceContext;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.AccessDeniedException;
 import org.apache.nifi.authorization.AuthorizationRequest;
@@ -123,13 +105,29 @@ import org.apache.nifi.web.api.request.DateTimeParameter;
 import org.apache.nifi.web.api.request.IntegerParameter;
 import org.apache.nifi.web.api.request.LongParameter;
 
-import com.sun.jersey.api.core.ResourceContext;
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * RESTful endpoint for managing a Flow.
@@ -208,7 +206,7 @@ public class FlowResource extends ApplicationResource {
     /**
      * Authorizes access to the flow.
      */
-    private void authorizeFlow(final RequestAction action) {
+    private void authorizeFlow() {
         final NiFiUser user = NiFiUserUtils.getNiFiUser();
 
         final Map<String,String> userContext;
@@ -224,7 +222,7 @@ public class FlowResource extends ApplicationResource {
             .identity(user.getIdentity())
             .anonymous(user.isAnonymous())
             .accessAttempt(true)
-            .action(action)
+            .action(RequestAction.READ)
             .userContext(userContext)
             .build();
 
@@ -287,7 +285,7 @@ public class FlowResource extends ApplicationResource {
             }
     )
     public Response generateClientId() {
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
         return clusterContext(generateOkResponse(generateUuid())).build();
     }
 
@@ -321,7 +319,7 @@ public class FlowResource extends ApplicationResource {
     )
     public Response getFlowConfig() {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         if (isReplicateRequest()) {
             return replicate(HttpMethod.GET);
@@ -350,7 +348,7 @@ public class FlowResource extends ApplicationResource {
     )
     public Response getCurrentUser() {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // note that the cluster manager will handle this request directly
         final NiFiUser user = NiFiUserUtils.getNiFiUser();
@@ -413,7 +411,7 @@ public class FlowResource extends ApplicationResource {
         )
         @QueryParam("recursive") @DefaultValue(RECURSIVE) Boolean recursive) throws InterruptedException {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         if (isReplicateRequest()) {
             return replicate(HttpMethod.GET);
@@ -458,7 +456,7 @@ public class FlowResource extends ApplicationResource {
     )
     public Response getControllerServicesFromController() {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         if (isReplicateRequest()) {
             return replicate(HttpMethod.GET);
@@ -511,7 +509,7 @@ public class FlowResource extends ApplicationResource {
         )
         @PathParam("id") String groupId) throws InterruptedException {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // get all the controller services
         final Set<ControllerServiceEntity> controllerServices = serviceFacade.getControllerServices(groupId);
@@ -566,7 +564,7 @@ public class FlowResource extends ApplicationResource {
         )
         @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId) {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         if (isReplicateRequest()) {
             return replicate(HttpMethod.GET);
@@ -622,7 +620,7 @@ public class FlowResource extends ApplicationResource {
         @PathParam("id") String id,
         ScheduleComponentsEntity scheduleComponentsEntity) {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // ensure the same id is being used
         if (!id.equals(scheduleComponentsEntity.getId())) {
@@ -753,7 +751,7 @@ public class FlowResource extends ApplicationResource {
             }
     )
     public Response searchFlow(@QueryParam("q") @DefaultValue(StringUtils.EMPTY) String value) throws InterruptedException {
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // query the controller
         final SearchResultsDTO results = serviceFacade.searchController(value);
@@ -796,7 +794,7 @@ public class FlowResource extends ApplicationResource {
     )
     public Response getControllerStatus() throws InterruptedException {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         if (isReplicateRequest()) {
             return replicate(HttpMethod.GET);
@@ -841,7 +839,7 @@ public class FlowResource extends ApplicationResource {
     )
     public Response getBanners() {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // get the banner from the properties - will come from the NCM when clustered
         final String bannerText = getProperties().getBannerText();
@@ -888,7 +886,7 @@ public class FlowResource extends ApplicationResource {
             }
     )
     public Response getProcessorTypes() throws InterruptedException {
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // create response entity
         final ProcessorTypesEntity entity = new ProcessorTypesEntity();
@@ -933,7 +931,7 @@ public class FlowResource extends ApplicationResource {
                     required = false
             )
         @QueryParam("serviceType") String serviceType) throws InterruptedException {
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // create response entity
         final ControllerServiceTypesEntity entity = new ControllerServiceTypesEntity();
@@ -972,7 +970,7 @@ public class FlowResource extends ApplicationResource {
             }
     )
     public Response getReportingTaskTypes() throws InterruptedException {
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // create response entity
         final ReportingTaskTypesEntity entity = new ReportingTaskTypesEntity();
@@ -1011,7 +1009,7 @@ public class FlowResource extends ApplicationResource {
             }
     )
     public Response getPrioritizers() throws InterruptedException {
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // create response entity
         final PrioritizerTypesEntity entity = new PrioritizerTypesEntity();
@@ -1049,7 +1047,7 @@ public class FlowResource extends ApplicationResource {
             }
     )
     public Response getAboutInfo() {
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // create the about dto
         final AboutDTO aboutDTO = new AboutDTO();
@@ -1139,7 +1137,7 @@ public class FlowResource extends ApplicationResource {
         )
         @QueryParam("limit") IntegerParameter limit) throws InterruptedException {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // replicate if cluster manager
         if (isReplicateRequest()) {
@@ -1230,7 +1228,7 @@ public class FlowResource extends ApplicationResource {
         )
         @PathParam("id") String id) throws InterruptedException {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // ensure a valid request
         if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
@@ -1312,7 +1310,7 @@ public class FlowResource extends ApplicationResource {
         )
         @PathParam("id") String id) throws InterruptedException {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // ensure a valid request
         if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
@@ -1394,7 +1392,7 @@ public class FlowResource extends ApplicationResource {
         )
         @PathParam("id") String id) throws InterruptedException {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // ensure a valid request
         if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
@@ -1476,7 +1474,7 @@ public class FlowResource extends ApplicationResource {
         )
         @PathParam("id") String id) throws InterruptedException {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // ensure a valid request
         if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
@@ -1567,7 +1565,7 @@ public class FlowResource extends ApplicationResource {
         )
         @PathParam("id") String groupId) throws InterruptedException {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // ensure a valid request
         if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
@@ -1670,7 +1668,7 @@ public class FlowResource extends ApplicationResource {
         )
         @PathParam("id") String id) throws InterruptedException {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // ensure a valid request
         if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
@@ -1746,7 +1744,7 @@ public class FlowResource extends ApplicationResource {
         )
         @PathParam("id") String id) throws InterruptedException {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // replicate if cluster manager
         if (isReplicateRequest()) {
@@ -1801,7 +1799,7 @@ public class FlowResource extends ApplicationResource {
         )
         @PathParam("id") String groupId) throws InterruptedException {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // replicate if cluster manager
         if (isReplicateRequest()) {
@@ -1856,7 +1854,7 @@ public class FlowResource extends ApplicationResource {
         )
         @PathParam("id") String id) throws InterruptedException {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // replicate if cluster manager
         if (isReplicateRequest()) {
@@ -1911,7 +1909,7 @@ public class FlowResource extends ApplicationResource {
         )
         @PathParam("id") String id) throws InterruptedException {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // replicate if cluster manager
         if (isReplicateRequest()) {
@@ -2022,7 +2020,7 @@ public class FlowResource extends ApplicationResource {
             )
             @QueryParam("sourceId") String sourceId) {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // ensure the page is specified
         if (offset == null) {
@@ -2130,7 +2128,7 @@ public class FlowResource extends ApplicationResource {
             )
             @PathParam("id") IntegerParameter id) {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // ensure the id was specified
         if (id == null) {
@@ -2188,7 +2186,7 @@ public class FlowResource extends ApplicationResource {
             )
             @PathParam("componentId") final String componentId) {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         if (isReplicateRequest()) {
             return replicate(HttpMethod.GET);
@@ -2240,7 +2238,7 @@ public class FlowResource extends ApplicationResource {
         }
 
         // authorize access
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // get all the templates
         final Set<TemplateEntity> templates = serviceFacade.getTemplates();
@@ -2295,7 +2293,7 @@ public class FlowResource extends ApplicationResource {
             )
             @QueryParam("q") @DefaultValue(StringUtils.EMPTY) String value) {
 
-        authorizeFlow(RequestAction.READ);
+        authorizeFlow();
 
         // ensure connected to the cluster
         if (!isConnectedToCluster()) {