You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/04/04 18:28:43 UTC
[07/18] nifi git commit: NIFI-1563: - Federate requests and merge
responses from nodes instead of storing bulletins and stats at NCM - Updating
UI to support restructured status history DTO. - Return 'Insufficient
History' message if aggregate stats don'
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
index 857df56..ec4c69e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java
@@ -16,24 +16,14 @@
*/
package org.apache.nifi.web.api;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.FormParam;
-import javax.ws.rs.GET;
-import javax.ws.rs.HEAD;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
+import com.sun.jersey.api.core.ResourceContext;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiParam;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+import com.wordnik.swagger.annotations.Authorization;
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
import org.apache.nifi.cluster.node.Node;
import org.apache.nifi.util.NiFiProperties;
@@ -47,36 +37,29 @@ import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.search.NodeSearchResultDTO;
-import org.apache.nifi.web.api.dto.status.ClusterConnectionStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterStatusDTO;
-import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
-import org.apache.nifi.web.api.entity.ClusterConnectionStatusEntity;
import org.apache.nifi.web.api.entity.ClusterEntity;
-import org.apache.nifi.web.api.entity.ClusterPortStatusEntity;
-import org.apache.nifi.web.api.entity.ClusterProcessorStatusEntity;
-import org.apache.nifi.web.api.entity.ClusterRemoteProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ClusterSearchResultsEntity;
-import org.apache.nifi.web.api.entity.ClusterStatusEntity;
-import org.apache.nifi.web.api.entity.ClusterStatusHistoryEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.LongParameter;
-
-import org.apache.commons.lang3.StringUtils;
import org.springframework.security.access.prepost.PreAuthorize;
-import com.sun.jersey.api.core.ResourceContext;
-import com.wordnik.swagger.annotations.Api;
-import com.wordnik.swagger.annotations.ApiOperation;
-import com.wordnik.swagger.annotations.ApiParam;
-import com.wordnik.swagger.annotations.ApiResponse;
-import com.wordnik.swagger.annotations.ApiResponses;
-import com.wordnik.swagger.annotations.Authorization;
-import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
-import org.apache.nifi.web.api.entity.ClusterProcessGroupStatusEntity;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.FormParam;
+import javax.ws.rs.GET;
+import javax.ws.rs.HEAD;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.ArrayList;
+import java.util.List;
/**
* RESTful endpoint for managing a cluster.
@@ -108,62 +91,6 @@ public class ClusterResource extends ApplicationResource {
}
/**
- * Gets the status of this NiFi cluster.
- *
- * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
- * @return A clusterStatusEntity
- */
- @GET
- @Consumes(MediaType.WILDCARD)
- @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
- @Path("/status")
- @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
- @ApiOperation(
- value = "Gets the status of the cluster",
- response = ClusterStatusEntity.class,
- authorizations = {
- @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
- @Authorization(value = "DFM", type = "ROLE_DFM"),
- @Authorization(value = "Admin", type = "ROLE_ADMIN")
- }
- )
- @ApiResponses(
- value = {
- @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
- @ApiResponse(code = 401, message = "Client could not be authenticated."),
- @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
- @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
- }
- )
- public Response getClusterStatus(
- @ApiParam(
- value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
- required = false
- )
- @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId) {
-
- if (properties.isClusterManager()) {
-
- ClusterStatusDTO dto = serviceFacade.getClusterStatus();
-
- // create the revision
- RevisionDTO revision = new RevisionDTO();
- revision.setClientId(clientId.getClientId());
-
- // create entity
- final ClusterStatusEntity entity = new ClusterStatusEntity();
- entity.setClusterStatus(dto);
- entity.setRevision(revision);
-
- // generate the response
- return generateOkResponse(entity).build();
- }
-
- throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
-
- }
-
- /**
* Returns a 200 OK response to indicate this is a valid cluster endpoint.
*
* @return An OK response with an empty entity body.
@@ -519,622 +446,6 @@ public class ClusterResource extends ApplicationResource {
throw new IllegalClusterResourceRequestException("Only a node can process the request.");
}
- /**
- * Gets the processor status for every node.
- *
- * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
- * @param id The id of the processor
- * @return A clusterProcessorStatusEntity
- */
- @GET
- @Consumes(MediaType.WILDCARD)
- @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
- @Path("/processors/{id}/status")
- @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
- @ApiOperation(
- value = "Gets the processor status across the cluster",
- response = ClusterProcessorStatusEntity.class,
- authorizations = {
- @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
- @Authorization(value = "DFM", type = "ROLE_DFM"),
- @Authorization(value = "Admin", type = "ROLE_ADMIN")
- }
- )
- @ApiResponses(
- value = {
- @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
- @ApiResponse(code = 401, message = "Client could not be authenticated."),
- @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
- @ApiResponse(code = 404, message = "The specified resource could not be found."),
- @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
- }
- )
- public Response getProcessorStatus(
- @ApiParam(
- value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
- required = false
- )
- @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
- @ApiParam(
- value = "The processor id",
- required = true
- )
- @PathParam("id") String id) {
-
- if (properties.isClusterManager()) {
-
- final ClusterProcessorStatusDTO dto = serviceFacade.getClusterProcessorStatus(id);
-
- // create the revision
- RevisionDTO revision = new RevisionDTO();
- revision.setClientId(clientId.getClientId());
-
- // create entity
- final ClusterProcessorStatusEntity entity = new ClusterProcessorStatusEntity();
- entity.setClusterProcessorStatus(dto);
- entity.setRevision(revision);
-
- // generate the response
- return generateOkResponse(entity).build();
- }
-
- throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
- }
-
- /**
- * Gets the processor status history for every node.
- *
- * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
- * @param id The id of the processor
- * @return A clusterProcessorStatusHistoryEntity
- */
- @GET
- @Consumes(MediaType.WILDCARD)
- @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
- @Path("/processors/{id}/status/history")
- @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
- @ApiOperation(
- value = "Gets processor status history across the cluster",
- response = ClusterStatusHistoryEntity.class,
- authorizations = {
- @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
- @Authorization(value = "DFM", type = "ROLE_DFM"),
- @Authorization(value = "Admin", type = "ROLE_ADMIN")
- }
- )
- @ApiResponses(
- value = {
- @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
- @ApiResponse(code = 401, message = "Client could not be authenticated."),
- @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
- @ApiResponse(code = 404, message = "The specified resource could not be found."),
- @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
- }
- )
- public Response getProcessorStatusHistory(
- @ApiParam(
- value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
- required = false
- )
- @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
- @ApiParam(
- value = "The processor id",
- required = true
- )
- @PathParam("id") String id) {
-
- if (properties.isClusterManager()) {
- final ClusterStatusHistoryDTO dto = serviceFacade.getClusterProcessorStatusHistory(id);
-
- // create the revision
- RevisionDTO revision = new RevisionDTO();
- revision.setClientId(clientId.getClientId());
-
- // create entity
- final ClusterStatusHistoryEntity entity = new ClusterStatusHistoryEntity();
- entity.setClusterStatusHistory(dto);
- entity.setRevision(revision);
-
- // generate the response
- return generateOkResponse(entity).build();
- }
-
- throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
- }
-
- /**
- * Gets the connection status for every node.
- *
- * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
- * @param id The id of the processor
- * @return A clusterProcessorStatusEntity
- */
- @GET
- @Consumes(MediaType.WILDCARD)
- @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
- @Path("/connections/{id}/status")
- @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
- @ApiOperation(
- value = "Gets connection status across the cluster",
- response = ClusterConnectionStatusEntity.class,
- authorizations = {
- @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
- @Authorization(value = "DFM", type = "ROLE_DFM"),
- @Authorization(value = "Admin", type = "ROLE_ADMIN")
- }
- )
- @ApiResponses(
- value = {
- @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
- @ApiResponse(code = 401, message = "Client could not be authenticated."),
- @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
- @ApiResponse(code = 404, message = "The specified resource could not be found."),
- @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
- }
- )
- public Response getConnectionStatus(
- @ApiParam(
- value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
- required = false
- )
- @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
- @ApiParam(
- value = "The connection id",
- required = true
- )
- @PathParam("id") String id) {
-
- if (properties.isClusterManager()) {
-
- final ClusterConnectionStatusDTO dto = serviceFacade.getClusterConnectionStatus(id);
-
- // create the revision
- RevisionDTO revision = new RevisionDTO();
- revision.setClientId(clientId.getClientId());
-
- // create entity
- final ClusterConnectionStatusEntity entity = new ClusterConnectionStatusEntity();
- entity.setClusterConnectionStatus(dto);
- entity.setRevision(revision);
-
- // generate the response
- return generateOkResponse(entity).build();
- }
-
- throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
- }
-
- /**
- * Gets the connections status history for every node.
- *
- * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
- * @param id The id of the processor
- * @return A clusterProcessorStatusHistoryEntity
- */
- @GET
- @Consumes(MediaType.WILDCARD)
- @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
- @Path("/connections/{id}/status/history")
- @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
- @ApiOperation(
- value = "Gets connection status history across the cluster",
- response = ClusterStatusHistoryEntity.class,
- authorizations = {
- @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
- @Authorization(value = "DFM", type = "ROLE_DFM"),
- @Authorization(value = "Admin", type = "ROLE_ADMIN")
- }
- )
- @ApiResponses(
- value = {
- @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
- @ApiResponse(code = 401, message = "Client could not be authenticated."),
- @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
- @ApiResponse(code = 404, message = "The specified resource could not be found."),
- @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
- }
- )
- public Response getConnectionStatusHistory(
- @ApiParam(
- value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
- required = false
- )
- @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
- @ApiParam(
- value = "The connection id.",
- required = true
- )
- @PathParam("id") String id) {
-
- if (properties.isClusterManager()) {
- final ClusterStatusHistoryDTO dto = serviceFacade.getClusterConnectionStatusHistory(id);
-
- // create the revision
- RevisionDTO revision = new RevisionDTO();
- revision.setClientId(clientId.getClientId());
-
- // create entity
- final ClusterStatusHistoryEntity entity = new ClusterStatusHistoryEntity();
- entity.setClusterStatusHistory(dto);
- entity.setRevision(revision);
-
- // generate the response
- return generateOkResponse(entity).build();
- }
-
- throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
- }
-
- /**
- * Gets the process group status for every node.
- *
- * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
- * @param id The id of the process group
- * @return A clusterProcessGroupStatusEntity
- */
- @GET
- @Consumes(MediaType.WILDCARD)
- @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
- @Path("/process-groups/{id}/status")
- @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
- @ApiOperation(
- value = "Gets process group status across the cluster",
- response = ClusterProcessGroupStatusEntity.class,
- authorizations = {
- @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
- @Authorization(value = "DFM", type = "ROLE_DFM"),
- @Authorization(value = "Admin", type = "ROLE_ADMIN")
- }
- )
- @ApiResponses(
- value = {
- @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
- @ApiResponse(code = 401, message = "Client could not be authenticated."),
- @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
- @ApiResponse(code = 404, message = "The specified resource could not be found."),
- @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
- }
- )
- public Response getProcessGroupStatus(
- @ApiParam(
- value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
- required = false
- )
- @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
- @ApiParam(
- value = "The process group id.",
- required = true
- )
- @PathParam("id") String id) {
-
- if (properties.isClusterManager()) {
-
- final ClusterProcessGroupStatusDTO dto = serviceFacade.getClusterProcessGroupStatus(id);
-
- // create the revision
- RevisionDTO revision = new RevisionDTO();
- revision.setClientId(clientId.getClientId());
-
- // create entity
- final ClusterProcessGroupStatusEntity entity = new ClusterProcessGroupStatusEntity();
- entity.setClusterProcessGroupStatus(dto);
- entity.setRevision(revision);
-
- // generate the response
- return generateOkResponse(entity).build();
- }
-
- throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
- }
-
- /**
- * Gets the process group status history for every node.
- *
- * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
- * @param id The id of the process group
- * @return A clusterProcessGroupStatusHistoryEntity
- */
- @GET
- @Consumes(MediaType.WILDCARD)
- @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
- @Path("/process-groups/{id}/status/history")
- @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
- @ApiOperation(
- value = "Gets process group status history across the cluster",
- response = ClusterStatusHistoryEntity.class,
- authorizations = {
- @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
- @Authorization(value = "DFM", type = "ROLE_DFM"),
- @Authorization(value = "Admin", type = "ROLE_ADMIN")
- }
- )
- @ApiResponses(
- value = {
- @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
- @ApiResponse(code = 401, message = "Client could not be authenticated."),
- @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
- @ApiResponse(code = 404, message = "The specified resource could not be found."),
- @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
- }
- )
- public Response getProcessGroupStatusHistory(
- @ApiParam(
- value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
- required = false
- )
- @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
- @ApiParam(
- value = "The process group id.",
- required = true
- )
- @PathParam("id") String id) {
-
- if (properties.isClusterManager()) {
- final ClusterStatusHistoryDTO dto = serviceFacade.getClusterProcessGroupStatusHistory(id);
-
- // create the revision
- RevisionDTO revision = new RevisionDTO();
- revision.setClientId(clientId.getClientId());
-
- // create entity
- final ClusterStatusHistoryEntity entity = new ClusterStatusHistoryEntity();
- entity.setClusterStatusHistory(dto);
- entity.setRevision(revision);
-
- // generate the response
- return generateOkResponse(entity).build();
- }
-
- throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
- }
-
- /**
- * Gets the remote process group status for every node.
- *
- * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
- * @param id The id of the remote process group
- * @return A clusterRemoteProcessGroupStatusEntity
- */
- @GET
- @Consumes(MediaType.WILDCARD)
- @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
- @Path("/remote-process-groups/{id}/status")
- @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
- @ApiOperation(
- value = "Gets remote process group status across the cluster",
- response = ClusterRemoteProcessGroupStatusEntity.class,
- authorizations = {
- @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
- @Authorization(value = "DFM", type = "ROLE_DFM"),
- @Authorization(value = "Admin", type = "ROLE_ADMIN")
- }
- )
- @ApiResponses(
- value = {
- @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
- @ApiResponse(code = 401, message = "Client could not be authenticated."),
- @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
- @ApiResponse(code = 404, message = "The specified resource could not be found."),
- @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
- }
- )
- public Response getRemoteProcessGroupStatus(
- @ApiParam(
- value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
- required = false
- )
- @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
- @ApiParam(
- value = "The remote process group id.",
- required = true
- )
- @PathParam("id") String id) {
-
- if (properties.isClusterManager()) {
-
- final ClusterRemoteProcessGroupStatusDTO dto = serviceFacade.getClusterRemoteProcessGroupStatus(id);
-
- // create the revision
- RevisionDTO revision = new RevisionDTO();
- revision.setClientId(clientId.getClientId());
-
- // create entity
- final ClusterRemoteProcessGroupStatusEntity entity = new ClusterRemoteProcessGroupStatusEntity();
- entity.setClusterRemoteProcessGroupStatus(dto);
- entity.setRevision(revision);
-
- // generate the response
- return generateOkResponse(entity).build();
- }
-
- throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
- }
-
- /**
- * Gets the input port status for every node.
- *
- * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
- * @param id The id of the input port
- * @return A clusterPortStatusEntity
- */
- @GET
- @Consumes(MediaType.WILDCARD)
- @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
- @Path("/input-ports/{id}/status")
- @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
- @ApiOperation(
- value = "Gets input port status across the cluster",
- response = ClusterPortStatusEntity.class,
- authorizations = {
- @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
- @Authorization(value = "DFM", type = "ROLE_DFM"),
- @Authorization(value = "Admin", type = "ROLE_ADMIN")
- }
- )
- @ApiResponses(
- value = {
- @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
- @ApiResponse(code = 401, message = "Client could not be authenticated."),
- @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
- @ApiResponse(code = 404, message = "The specified resource could not be found."),
- @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
- }
- )
- public Response getInputPortStatus(
- @ApiParam(
- value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
- required = false
- )
- @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
- @ApiParam(
- value = "The input port id.",
- required = true
- )
- @PathParam("id") String id) {
-
- if (properties.isClusterManager()) {
-
- final ClusterPortStatusDTO dto = serviceFacade.getClusterInputPortStatus(id);
-
- // create the revision
- RevisionDTO revision = new RevisionDTO();
- revision.setClientId(clientId.getClientId());
-
- // create entity
- final ClusterPortStatusEntity entity = new ClusterPortStatusEntity();
- entity.setClusterPortStatus(dto);
- entity.setRevision(revision);
-
- // generate the response
- return generateOkResponse(entity).build();
- }
-
- throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
- }
-
- /**
- * Gets the output port status for every node.
- *
- * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
- * @param id The id of the output port
- * @return A clusterPortStatusEntity
- */
- @GET
- @Consumes(MediaType.WILDCARD)
- @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
- @Path("/output-ports/{id}/status")
- @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
- @ApiOperation(
- value = "Gets output port status across the cluster",
- response = ClusterPortStatusEntity.class,
- authorizations = {
- @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
- @Authorization(value = "DFM", type = "ROLE_DFM"),
- @Authorization(value = "Admin", type = "ROLE_ADMIN")
- }
- )
- @ApiResponses(
- value = {
- @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
- @ApiResponse(code = 401, message = "Client could not be authenticated."),
- @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
- @ApiResponse(code = 404, message = "The specified resource could not be found."),
- @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
- }
- )
- public Response getOutputPortStatus(
- @ApiParam(
- value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
- required = false
- )
- @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
- @ApiParam(
- value = "The output port id.",
- required = true
- )
- @PathParam("id") String id) {
-
- if (properties.isClusterManager()) {
-
- final ClusterPortStatusDTO dto = serviceFacade.getClusterOutputPortStatus(id);
-
- // create the revision
- RevisionDTO revision = new RevisionDTO();
- revision.setClientId(clientId.getClientId());
-
- // create entity
- final ClusterPortStatusEntity entity = new ClusterPortStatusEntity();
- entity.setClusterPortStatus(dto);
- entity.setRevision(revision);
-
- // generate the response
- return generateOkResponse(entity).build();
- }
-
- throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
- }
-
- /**
- * Gets the remote process group status history for every node.
- *
- * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
- * @param id The id of the processor
- * @return A clusterRemoteProcessGroupStatusHistoryEntity
- */
- @GET
- @Consumes(MediaType.WILDCARD)
- @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON})
- @Path("/remote-process-groups/{id}/status/history")
- @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
- @ApiOperation(
- value = "Gets the remote process group status history across the cluster",
- response = ClusterStatusHistoryEntity.class,
- authorizations = {
- @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
- @Authorization(value = "DFM", type = "ROLE_DFM"),
- @Authorization(value = "Admin", type = "ROLE_ADMIN")
- }
- )
- @ApiResponses(
- value = {
- @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
- @ApiResponse(code = 401, message = "Client could not be authenticated."),
- @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
- @ApiResponse(code = 404, message = "The specified resource could not be found."),
- @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
- }
- )
- public Response getRemoteProcessGroupStatusHistory(
- @ApiParam(
- value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
- required = false
- )
- @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
- @ApiParam(
- value = "The remote process group id.",
- required = true
- )
- @PathParam("id") String id) {
-
- if (properties.isClusterManager()) {
- final ClusterStatusHistoryDTO dto = serviceFacade.getClusterRemoteProcessGroupStatusHistory(id);
-
- // create the revision
- RevisionDTO revision = new RevisionDTO();
- revision.setClientId(clientId.getClientId());
-
- // create entity
- final ClusterStatusHistoryEntity entity = new ClusterStatusHistoryEntity();
- entity.setClusterStatusHistory(dto);
- entity.setRevision(revision);
-
- // generate the response
- return generateOkResponse(entity).build();
- }
-
- throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
- }
-
// setters
public void setServiceFacade(NiFiServiceFacade serviceFacade) {
this.serviceFacade = serviceFacade;
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
index 738da04..712233f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
@@ -25,6 +25,7 @@ import com.wordnik.swagger.annotations.Authorization;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
+import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
import org.apache.nifi.cluster.node.Node;
@@ -33,7 +34,6 @@ import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.ConfigurationSnapshot;
import org.apache.nifi.web.DownloadableContent;
-import org.apache.nifi.web.IllegalClusterResourceRequestException;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.ConnectableDTO;
@@ -44,8 +44,10 @@ import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
import org.apache.nifi.web.api.dto.ListingRequestDTO;
import org.apache.nifi.web.api.dto.PositionDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
import org.apache.nifi.web.api.entity.ConnectionsEntity;
import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.apache.nifi.web.api.entity.FlowFileEntity;
@@ -55,8 +57,6 @@ import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.ConnectableTypeParameter;
import org.apache.nifi.web.api.request.IntegerParameter;
import org.apache.nifi.web.api.request.LongParameter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.security.access.prepost.PreAuthorize;
import javax.servlet.http.HttpServletRequest;
@@ -99,8 +99,6 @@ import java.util.UUID;
@Api(hidden = true)
public class ConnectionResource extends ApplicationResource {
- private static final Logger logger = LoggerFactory.getLogger(ConnectionResource.class);
-
private NiFiServiceFacade serviceFacade;
private WebClusterManager clusterManager;
private NiFiProperties properties;
@@ -284,6 +282,106 @@ public class ConnectionResource extends ApplicationResource {
}
/**
+ * Retrieves the specified connection status.
+ *
+ * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+ * @param id The id of the connection history to retrieve.
+ * @return A connectionStatusEntity.
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+ @Path("/{id}/status")
+ @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+ @ApiOperation(
+ value = "Gets status for a connection",
+ response = ConnectionStatusEntity.class,
+ authorizations = {
+ @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+ @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response getConnectionStatus(
+ @ApiParam(
+ value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+ required = false
+ )
+ @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+ @ApiParam(
+ value = "Whether or not to include the breakdown per node. Optional, defaults to false",
+ required = false
+ )
+ @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+ @ApiParam(
+ value = "The id of the node where to get the status.",
+ required = false
+ )
+ @QueryParam("clusterNodeId") String clusterNodeId,
+ @ApiParam(
+ value = "The connection id.",
+ required = true
+ )
+ @PathParam("id") String id) {
+
+ // ensure a valid request
+ if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+ throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
+ }
+
+ if (properties.isClusterManager()) {
+ // determine where this request should be sent
+ if (clusterNodeId == null) {
+ final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders());
+ final ConnectionStatusEntity entity = (ConnectionStatusEntity) nodeResponse.getUpdatedEntity();
+
+ // ensure there is an updated entity (result of merging) and prune the response as necessary
+ if (entity != null && !nodewise) {
+ entity.getConnectionStatus().setNodeSnapshots(null);
+ }
+
+ return nodeResponse.getResponse();
+ } else {
+ // get the target node and ensure it exists
+ final Node targetNode = clusterManager.getNode(clusterNodeId);
+ if (targetNode == null) {
+ throw new UnknownNodeException("The specified cluster node does not exist.");
+ }
+
+ final Set<NodeIdentifier> targetNodes = new HashSet<>();
+ targetNodes.add(targetNode.getNodeId());
+
+ // replicate the request to the specific node
+ return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse();
+ }
+ }
+
+ // get the specified connection status
+ final ConnectionStatusDTO connectionStatus = serviceFacade.getConnectionStatus(groupId, id);
+
+ // create the revision
+ final RevisionDTO revision = new RevisionDTO();
+ revision.setClientId(clientId.getClientId());
+
+ // generate the response entity
+ final ConnectionStatusEntity entity = new ConnectionStatusEntity();
+ entity.setRevision(revision);
+ entity.setConnectionStatus(connectionStatus);
+
+ // generate the response
+ return clusterContext(generateOkResponse(entity)).build();
+ }
+
+ /**
* Retrieves the specified connection status history.
*
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
@@ -327,7 +425,7 @@ public class ConnectionResource extends ApplicationResource {
// replicate if cluster manager
if (properties.isClusterManager()) {
- throw new IllegalClusterResourceRequestException("This request is only supported in standalone mode.");
+ return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
}
// get the specified processor status history
@@ -565,7 +663,7 @@ public class ConnectionResource extends ApplicationResource {
headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
// replicate put request
- return (Response) clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(connectionEntity), getHeaders(headersToOverride)).getResponse();
+ return clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(connectionEntity), getHeaders(headersToOverride)).getResponse();
}
// get the connection
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
index ef62a62..a3d0dc1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
@@ -23,34 +23,17 @@ import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;
import com.wordnik.swagger.annotations.Authorization;
-
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.FormParam;
-import javax.ws.rs.GET;
-import javax.ws.rs.HEAD;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
-import org.apache.nifi.web.security.user.NiFiUserUtils;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.user.NiFiUser;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.ConfigurationSnapshot;
-import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.IllegalClusterResourceRequestException;
+import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.AboutDTO;
import org.apache.nifi.web.api.dto.BannerDTO;
@@ -66,23 +49,46 @@ import org.apache.nifi.web.api.entity.AuthorityEntity;
import org.apache.nifi.web.api.entity.BannerEntity;
import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
import org.apache.nifi.web.api.entity.ControllerEntity;
+import org.apache.nifi.web.api.entity.ControllerServiceTypesEntity;
import org.apache.nifi.web.api.entity.ControllerStatusEntity;
import org.apache.nifi.web.api.entity.CounterEntity;
import org.apache.nifi.web.api.entity.CountersEntity;
import org.apache.nifi.web.api.entity.Entity;
+import org.apache.nifi.web.api.entity.IdentityEntity;
import org.apache.nifi.web.api.entity.PrioritizerTypesEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessorTypesEntity;
+import org.apache.nifi.web.api.entity.ReportingTaskTypesEntity;
import org.apache.nifi.web.api.entity.SearchResultsEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.IntegerParameter;
import org.apache.nifi.web.api.request.LongParameter;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.web.api.entity.ControllerServiceTypesEntity;
-import org.apache.nifi.web.api.entity.IdentityEntity;
-import org.apache.nifi.web.api.entity.ReportingTaskTypesEntity;
+import org.apache.nifi.web.security.user.NiFiUserUtils;
import org.springframework.security.access.prepost.PreAuthorize;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.FormParam;
+import javax.ws.rs.GET;
+import javax.ws.rs.HEAD;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
/**
* RESTful endpoint for managing a Flow Controller.
*/
@@ -524,6 +530,10 @@ public class ControllerResource extends ApplicationResource {
)
@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId) {
+ if (properties.isClusterManager()) {
+ return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+ }
+
final ControllerStatusDTO controllerStatus = serviceFacade.getControllerStatus();
// create the revision
@@ -572,7 +582,50 @@ public class ControllerResource extends ApplicationResource {
value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
required = false
)
- @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId) {
+ @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+ @ApiParam(
+ value = "Whether or not to include the breakdown per node. Optional, defaults to false",
+ required = false
+ )
+ @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+ @ApiParam(
+ value = "The id of the node where to get the status.",
+ required = false
+ )
+ @QueryParam("clusterNodeId") String clusterNodeId) {
+
+ // ensure a valid request
+ if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+ throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
+ }
+
+ // replicate if cluster manager
+ if (properties.isClusterManager()) {
+ // determine where this request should be sent
+ if (clusterNodeId == null) {
+ final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders());
+ final CountersEntity entity = (CountersEntity) nodeResponse.getUpdatedEntity();
+
+ // ensure there is an updated entity (result of merging) and prune the response as necessary
+ if (entity != null && !nodewise) {
+ entity.getCounters().setNodeSnapshots(null);
+ }
+
+ return nodeResponse.getResponse();
+ } else {
+ // get the target node and ensure it exists
+ final Node targetNode = clusterManager.getNode(clusterNodeId);
+ if (targetNode == null) {
+ throw new UnknownNodeException("The specified cluster node does not exist.");
+ }
+
+ final Set<NodeIdentifier> targetNodes = new HashSet<>();
+ targetNodes.add(targetNode.getNodeId());
+
+ // replicate the request to the specific node
+ return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse();
+ }
+ }
final CountersDTO countersReport = serviceFacade.getCounters();
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
index d2be69d..2f7eed6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java
@@ -22,12 +22,29 @@ import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;
import com.wordnik.swagger.annotations.Authorization;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
+import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.ConfigurationSnapshot;
+import org.apache.nifi.web.NiFiServiceFacade;
+import org.apache.nifi.web.Revision;
+import org.apache.nifi.web.api.dto.PortDTO;
+import org.apache.nifi.web.api.dto.PositionDTO;
+import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.status.PortStatusDTO;
+import org.apache.nifi.web.api.entity.InputPortEntity;
+import org.apache.nifi.web.api.entity.InputPortsEntity;
+import org.apache.nifi.web.api.entity.PortStatusEntity;
+import org.apache.nifi.web.api.request.ClientIdParameter;
+import org.apache.nifi.web.api.request.DoubleParameter;
+import org.apache.nifi.web.api.request.IntegerParameter;
+import org.apache.nifi.web.api.request.LongParameter;
+import org.springframework.security.access.prepost.PreAuthorize;
+
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
@@ -46,24 +63,13 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
-import org.apache.nifi.cluster.manager.impl.WebClusterManager;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.web.ConfigurationSnapshot;
-import org.apache.nifi.web.NiFiServiceFacade;
-import org.apache.nifi.web.Revision;
-import org.apache.nifi.web.api.dto.PortDTO;
-import org.apache.nifi.web.api.dto.PositionDTO;
-import org.apache.nifi.web.api.dto.RevisionDTO;
-import org.apache.nifi.web.api.entity.InputPortEntity;
-import org.apache.nifi.web.api.entity.InputPortsEntity;
-import org.apache.nifi.web.api.request.ClientIdParameter;
-import org.apache.nifi.web.api.request.DoubleParameter;
-import org.apache.nifi.web.api.request.IntegerParameter;
-import org.apache.nifi.web.api.request.LongParameter;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.security.access.prepost.PreAuthorize;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
/**
* RESTful endpoint for managing an Input Port.
@@ -71,8 +77,6 @@ import org.springframework.security.access.prepost.PreAuthorize;
@Api(hidden = true)
public class InputPortResource extends ApplicationResource {
- private static final Logger logger = LoggerFactory.getLogger(InputPortResource.class);
-
private NiFiServiceFacade serviceFacade;
private WebClusterManager clusterManager;
private NiFiProperties properties;
@@ -276,7 +280,7 @@ public class InputPortResource extends ApplicationResource {
headersToOverride.put("content-type", MediaType.APPLICATION_JSON);
// replicate put request
- return (Response) clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(portEntity), getHeaders(headersToOverride)).getResponse();
+ return clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(portEntity), getHeaders(headersToOverride)).getResponse();
}
@@ -370,6 +374,106 @@ public class InputPortResource extends ApplicationResource {
}
/**
+ * Retrieves the specified input port status.
+ *
+ * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+ * @param id The id of the processor history to retrieve.
+ * @return A portStatusEntity.
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+ @Path("/{id}/status")
+ @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+ @ApiOperation(
+ value = "Gets status for an input port",
+ response = PortStatusEntity.class,
+ authorizations = {
+ @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+ @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response getInputPortStatus(
+ @ApiParam(
+ value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+ required = false
+ )
+ @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+ @ApiParam(
+ value = "Whether or not to include the breakdown per node. Optional, defaults to false",
+ required = false
+ )
+ @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+ @ApiParam(
+ value = "The id of the node where to get the status.",
+ required = false
+ )
+ @QueryParam("clusterNodeId") String clusterNodeId,
+ @ApiParam(
+ value = "The input port id.",
+ required = true
+ )
+ @PathParam("id") String id) {
+
+ // ensure a valid request
+ if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+ throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
+ }
+
+ if (properties.isClusterManager()) {
+ // determine where this request should be sent
+ if (clusterNodeId == null) {
+ final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders());
+ final PortStatusEntity entity = (PortStatusEntity) nodeResponse.getUpdatedEntity();
+
+ // ensure there is an updated entity (result of merging) and prune the response as necessary
+ if (entity != null && !nodewise) {
+ entity.getPortStatus().setNodeSnapshots(null);
+ }
+
+ return nodeResponse.getResponse();
+ } else {
+ // get the target node and ensure it exists
+ final Node targetNode = clusterManager.getNode(clusterNodeId);
+ if (targetNode == null) {
+ throw new UnknownNodeException("The specified cluster node does not exist.");
+ }
+
+ final Set<NodeIdentifier> targetNodes = new HashSet<>();
+ targetNodes.add(targetNode.getNodeId());
+
+ // replicate the request to the specific node
+ return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse();
+ }
+ }
+
+ // get the specified input port status
+ final PortStatusDTO portStatus = serviceFacade.getInputPortStatus(groupId, id);
+
+ // create the revision
+ final RevisionDTO revision = new RevisionDTO();
+ revision.setClientId(clientId.getClientId());
+
+ // generate the response entity
+ final PortStatusEntity entity = new PortStatusEntity();
+ entity.setRevision(revision);
+ entity.setPortStatus(portStatus);
+
+ // generate the response
+ return clusterContext(generateOkResponse(entity)).build();
+ }
+
+ /**
* Updates the specified input port.
*
* @param httpServletRequest request
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java
index c88cc68..d3eb77a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/NodeResource.java
@@ -22,6 +22,16 @@ import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;
import com.wordnik.swagger.annotations.Authorization;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.IllegalClusterResourceRequestException;
+import org.apache.nifi.web.NiFiServiceFacade;
+import org.apache.nifi.web.api.dto.NodeDTO;
+import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.entity.NodeEntity;
+import org.apache.nifi.web.api.request.ClientIdParameter;
+import org.springframework.security.access.prepost.PreAuthorize;
+
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
@@ -34,19 +44,6 @@ import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.web.api.dto.NodeDTO;
-import org.apache.nifi.web.api.entity.NodeEntity;
-import org.apache.nifi.web.api.request.ClientIdParameter;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.web.IllegalClusterResourceRequestException;
-import org.apache.nifi.web.NiFiServiceFacade;
-import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO;
-import org.apache.nifi.web.api.dto.RevisionDTO;
-import org.apache.nifi.web.api.dto.status.NodeStatusDTO;
-import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
-import org.apache.nifi.web.api.entity.SystemDiagnosticsEntity;
-import org.springframework.security.access.prepost.PreAuthorize;
/**
* RESTful endpoint for managing a cluster connection.
@@ -121,129 +118,6 @@ public class NodeResource extends ApplicationResource {
throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
}
- /**
- * Gets the status for the specified node.
- *
- * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
- * @param id The id of the node.
- * @return A processGroupStatusEntity
- */
- @GET
- @Consumes(MediaType.WILDCARD)
- @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
- @Path("/{id}/status")
- @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
- @ApiOperation(
- value = "Gets process group status for a node in the cluster",
- response = ProcessGroupStatusEntity.class,
- authorizations = {
- @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
- @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
- @Authorization(value = "Administrator", type = "ROLE_ADMIN")
- }
- )
- @ApiResponses(
- value = {
- @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
- @ApiResponse(code = 401, message = "Client could not be authenticated."),
- @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
- @ApiResponse(code = 404, message = "The specified resource could not be found."),
- @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
- }
- )
- public Response getNodeStatus(
- @ApiParam(
- value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
- required = false
- )
- @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
- @ApiParam(
- value = "The node id.",
- required = true
- )
- @PathParam("id") String id) {
-
- if (properties.isClusterManager()) {
- // get the node statistics
- final NodeStatusDTO nodeStatus = serviceFacade.getNodeStatus(id);
-
- // create the revision
- final RevisionDTO revision = new RevisionDTO();
- revision.setClientId(clientId.getClientId());
-
- // create the node statics entity
- final ProcessGroupStatusEntity entity = new ProcessGroupStatusEntity();
- entity.setRevision(revision);
- entity.setProcessGroupStatus(nodeStatus.getControllerStatus());
-
- // generate the response
- return generateOkResponse(entity).build();
- }
-
- throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
- }
-
- /**
- * Gets the system diagnositics for the specified node.
- *
- * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
- * @param id The id of the node.
- * @return A systemDiagnosticsEntity
- */
- @GET
- @Consumes(MediaType.WILDCARD)
- @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
- @Path("/{id}/system-diagnostics")
- @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
- @ApiOperation(
- value = "Gets system diagnostics for a node in the cluester",
- response = SystemDiagnosticsEntity.class,
- authorizations = {
- @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
- @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
- @Authorization(value = "Administrator", type = "ROLE_ADMIN")
- }
- )
- @ApiResponses(
- value = {
- @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
- @ApiResponse(code = 401, message = "Client could not be authenticated."),
- @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
- @ApiResponse(code = 404, message = "The specified resource could not be found."),
- @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
- }
- )
- public Response getNodeSystemDiagnostics(
- @ApiParam(
- value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
- required = false
- )
- @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
- @ApiParam(
- value = "The node id.",
- required = true
- )
- @PathParam("id") String id) {
-
- if (properties.isClusterManager()) {
- // get the node statistics
- final NodeSystemDiagnosticsDTO nodeSystemDiagnostics = serviceFacade.getNodeSystemDiagnostics(id);
-
- // create the revision
- final RevisionDTO revision = new RevisionDTO();
- revision.setClientId(clientId.getClientId());
-
- // create the node statics entity
- final SystemDiagnosticsEntity entity = new SystemDiagnosticsEntity();
- entity.setRevision(revision);
- entity.setSystemDiagnostics(nodeSystemDiagnostics.getSystemDiagnostics());
-
- // generate the response
- return generateOkResponse(entity).build();
- }
-
- throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request.");
- }
/**
* Updates the contents of the specified node in this NiFi cluster.
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
index ccd08db..e76fcf0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java
@@ -25,6 +25,7 @@ import com.wordnik.swagger.annotations.Authorization;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -46,7 +47,12 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
+
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.ConfigurationSnapshot;
import org.apache.nifi.web.NiFiServiceFacade;
@@ -54,8 +60,10 @@ import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.PositionDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.status.PortStatusDTO;
import org.apache.nifi.web.api.entity.OutputPortEntity;
import org.apache.nifi.web.api.entity.OutputPortsEntity;
+import org.apache.nifi.web.api.entity.PortStatusEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.DoubleParameter;
import org.apache.nifi.web.api.request.IntegerParameter;
@@ -370,6 +378,106 @@ public class OutputPortResource extends ApplicationResource {
}
/**
+ * Retrieves the specified output port status.
+ *
+ * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+ * @param id The id of the processor history to retrieve.
+ * @return A portStatusEntity.
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+ @Path("/{id}/status")
+ @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')")
+ @ApiOperation(
+ value = "Gets status for an output port",
+ response = PortStatusEntity.class,
+ authorizations = {
+ @Authorization(value = "Read Only", type = "ROLE_MONITOR"),
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM"),
+ @Authorization(value = "Administrator", type = "ROLE_ADMIN")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response getOutputPortStatus(
+ @ApiParam(
+ value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+ required = false
+ )
+ @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+ @ApiParam(
+ value = "Whether or not to include the breakdown per node. Optional, defaults to false",
+ required = false
+ )
+ @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+ @ApiParam(
+ value = "The id of the node where to get the status.",
+ required = false
+ )
+ @QueryParam("clusterNodeId") String clusterNodeId,
+ @ApiParam(
+ value = "The output port id.",
+ required = true
+ )
+ @PathParam("id") String id) {
+
+ // ensure a valid request
+ if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+ throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
+ }
+
+ if (properties.isClusterManager()) {
+ // determine where this request should be sent
+ if (clusterNodeId == null) {
+ final NodeResponse nodeResponse = clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders());
+ final PortStatusEntity entity = (PortStatusEntity) nodeResponse.getUpdatedEntity();
+
+ // ensure there is an updated entity (result of merging) and prune the response as necessary
+ if (entity != null && !nodewise) {
+ entity.getPortStatus().setNodeSnapshots(null);
+ }
+
+ return nodeResponse.getResponse();
+ } else {
+ // get the target node and ensure it exists
+ final Node targetNode = clusterManager.getNode(clusterNodeId);
+ if (targetNode == null) {
+ throw new UnknownNodeException("The specified cluster node does not exist.");
+ }
+
+ final Set<NodeIdentifier> targetNodes = new HashSet<>();
+ targetNodes.add(targetNode.getNodeId());
+
+ // replicate the request to the specific node
+ return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders(), targetNodes).getResponse();
+ }
+ }
+
+ // get the specified output port status
+ final PortStatusDTO portStatus = serviceFacade.getOutputPortStatus(groupId, id);
+
+ // create the revision
+ final RevisionDTO revision = new RevisionDTO();
+ revision.setClientId(clientId.getClientId());
+
+ // generate the response entity
+ final PortStatusEntity entity = new PortStatusEntity();
+ entity.setRevision(revision);
+ entity.setPortStatus(portStatus);
+
+ // generate the response
+ return clusterContext(generateOkResponse(entity)).build();
+ }
+
+ /**
* Updates the specified output port.
*
* @param httpServletRequest request