You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/12/17 19:03:58 UTC
nifi git commit: NIFI-108: - Starting to add support for endpoints
that will listing flowfiles in a queue.
Repository: nifi
Updated Branches:
refs/heads/NIFI-108 2e22954cd -> e762d3c7d
NIFI-108:
- Starting to add support for endpoints that will listing flowfiles in a queue.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e762d3c7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e762d3c7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e762d3c7
Branch: refs/heads/NIFI-108
Commit: e762d3c7de8f16255250a73470a6338b73abcd4c
Parents: 2e22954
Author: Matt Gilman <ma...@gmail.com>
Authored: Thu Dec 17 13:03:25 2015 -0500
Committer: Matt Gilman <ma...@gmail.com>
Committed: Thu Dec 17 13:03:25 2015 -0500
----------------------------------------------------------------------
.../apache/nifi/web/api/dto/DropRequestDTO.java | 11 +-
.../nifi/web/api/dto/FlowFileSummaryDTO.java | 134 +++++++++
.../nifi/web/api/dto/ListingRequestDTO.java | 169 +++++++++++
.../web/api/entity/ListingRequestEntity.java | 44 +++
.../org/apache/nifi/web/NiFiServiceFacade.java | 31 ++
.../nifi/web/StandardNiFiServiceFacade.java | 18 +-
.../apache/nifi/web/api/ConnectionResource.java | 301 ++++++++++++++++++-
.../org/apache/nifi/web/api/dto/DtoFactory.java | 46 +++
.../org/apache/nifi/web/dao/ConnectionDAO.java | 33 +-
.../web/dao/impl/StandardConnectionDAO.java | 36 ++-
10 files changed, 807 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/e762d3c7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java
index c0b94a1..0cf48f3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java
@@ -51,12 +51,12 @@ public class DropRequestDTO {
private String state;
/**
- * The id for this component.
+ * The id for this drop request.
*
* @return The id
*/
@ApiModelProperty(
- value = "The id of the component."
+ value = "The id for this drop request."
)
public String getId() {
return this.id;
@@ -67,12 +67,12 @@ public class DropRequestDTO {
}
/**
- * The uri for linking to this component in this NiFi.
+ * The uri for linking to this drop request in this NiFi.
*
* @return The uri
*/
@ApiModelProperty(
- value = "The URI for futures requests to the component."
+ value = "The URI for futures requests to this drop request."
)
public String getUri() {
return uri;
@@ -128,6 +128,9 @@ public class DropRequestDTO {
/**
* @return the reason, if any, that this drop request failed
*/
+ @ApiModelProperty(
+ value = "The reason, if any, that this drop request failed."
+ )
public String getFailureReason() {
return failureReason;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e762d3c7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java
new file mode 100644
index 0000000..accb512
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowFileSummaryDTO.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.dto;
+
+import com.wordnik.swagger.annotations.ApiModelProperty;
+import org.apache.nifi.web.api.dto.util.TimestampAdapter;
+
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import java.util.Date;
+
+public class FlowFileSummaryDTO {
+
+ private String uuid;
+ private String filename;
+ private Integer position;
+ private Long size;
+ private Date lastQueuedTime;
+ private Date linageStartDate;
+ private Boolean isPenalized;
+
+ /**
+ * @return the FlowFile uuid
+ */
+ @ApiModelProperty(
+ value = "The FlowFile UUID."
+ )
+ public String getUuid() {
+ return uuid;
+ }
+
+ public void setUuid(String uuid) {
+ this.uuid = uuid;
+ }
+
+ /**
+ * @return the FlowFile filename
+ */
+ @ApiModelProperty(
+ value = "The FlowFile filename."
+ )
+ public String getFilename() {
+ return filename;
+ }
+
+ public void setFilename(String filename) {
+ this.filename = filename;
+ }
+
+ /**
+ * @return the FlowFile's position in the queue.
+ */
+ @ApiModelProperty(
+ value = "The FlowFile's position in the queue."
+ )
+ public Integer getPosition() {
+ return position;
+ }
+
+ public void setPosition(Integer position) {
+ this.position = position;
+ }
+
+ /**
+ * @return the FlowFile file size
+ */
+ @ApiModelProperty(
+ value = "The FlowFile file size."
+ )
+ public Long getSize() {
+ return size;
+ }
+
+ public void setSize(Long size) {
+ this.size = size;
+ }
+
+ /**
+ * @return when the FlowFile was last added to the queue
+ */
+ @XmlJavaTypeAdapter(TimestampAdapter.class)
+ @ApiModelProperty(
+ value = "When the FlowFile was last added to the queue."
+ )
+ public Date getLastQueuedTime() {
+ return lastQueuedTime;
+ }
+
+ public void setLastQueuedTime(Date lastQueuedTime) {
+ this.lastQueuedTime = lastQueuedTime;
+ }
+
+ /**
+ * @return when the FlowFile's greatest ancestor entered the flow
+ */
+ @XmlJavaTypeAdapter(TimestampAdapter.class)
+ @ApiModelProperty(
+ value = "When the FlowFile's greatest ancestor entered the flow."
+ )
+ public Date getLinageStartDate() {
+ return linageStartDate;
+ }
+
+ public void setLinageStartDate(Date linageStartDate) {
+ this.linageStartDate = linageStartDate;
+ }
+
+ /**
+ * @return if the FlowFile is penalized
+ */
+ @ApiModelProperty(
+ value = "If the FlowFile is penalized."
+ )
+ public Boolean getPenalized() {
+ return isPenalized;
+ }
+
+ public void setPenalized(Boolean penalized) {
+ isPenalized = penalized;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e762d3c7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java
new file mode 100644
index 0000000..53c2a74
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.dto;
+
+import com.wordnik.swagger.annotations.ApiModelProperty;
+import org.apache.nifi.web.api.dto.util.TimestampAdapter;
+
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import java.util.Date;
+import java.util.List;
+
+public class ListingRequestDTO {
+
+ private String id;
+ private String uri;
+
+ private Date submissionTime;
+ private Date lastUpdated;
+
+ private Integer percentCompleted;
+ private Boolean finished;
+ private String failureReason;
+
+ private String state;
+
+ private List<FlowFileSummaryDTO> flowFileSummaries;
+
+ /**
+ * @return the id for this listing request.
+ */
+ @ApiModelProperty(
+ value = "The id for this listing request."
+ )
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ /**
+ * @return the URI for this listing request.
+ */
+ @ApiModelProperty(
+ value = "The URI for futures requests to this listing request."
+ )
+ public String getUri() {
+ return uri;
+ }
+
+ public void setUri(String uri) {
+ this.uri = uri;
+ }
+
+ /**
+ * @return time the query was submitted
+ */
+ @XmlJavaTypeAdapter(TimestampAdapter.class)
+ @ApiModelProperty(
+ value = "The timestamp when the query was submitted."
+ )
+ public Date getSubmissionTime() {
+ return submissionTime;
+ }
+
+ public void setSubmissionTime(Date submissionTime) {
+ this.submissionTime = submissionTime;
+ }
+
+ /**
+ * @return the time this request was last updated
+ */
+ @XmlJavaTypeAdapter(TimestampAdapter.class)
+ @ApiModelProperty(
+ value = "The last time this listing request was updated."
+ )
+ public Date getLastUpdated() {
+ return lastUpdated;
+ }
+
+ public void setLastUpdated(Date lastUpdated) {
+ this.lastUpdated = lastUpdated;
+ }
+
+ /**
+ * @return percent completed
+ */
+ @ApiModelProperty(
+ value = "The current percent complete."
+ )
+ public Integer getPercentCompleted() {
+ return percentCompleted;
+ }
+
+ public void setPercentCompleted(Integer percentCompleted) {
+ this.percentCompleted = percentCompleted;
+ }
+
+ /**
+ * @return whether the query has finished
+ */
+ @ApiModelProperty(
+ value = "Whether the query has finished."
+ )
+ public Boolean getFinished() {
+ return finished;
+ }
+
+ public void setFinished(Boolean finished) {
+ this.finished = finished;
+ }
+
+ /**
+ * @return the reason, if any, that this listing request failed
+ */
+ @ApiModelProperty(
+ value = "The reason, if any, that this listing request failed."
+ )
+ public String getFailureReason() {
+ return failureReason;
+ }
+
+ public void setFailureReason(String failureReason) {
+ this.failureReason = failureReason;
+ }
+
+ /**
+ * @return the current state of the listing request.
+ */
+ @ApiModelProperty(
+ value = "The current state of the listing request."
+ )
+ public String getState() {
+ return state;
+ }
+
+ public void setState(String state) {
+ this.state = state;
+ }
+
+ /**
+ * @return the FlowFile summaries.
+ */
+ @ApiModelProperty(
+ value = "The FlowFile summaries. The summaries will be populated once the request has completed."
+ )
+ public List<FlowFileSummaryDTO> getFlowFileSummaries() {
+ return flowFileSummaries;
+ }
+
+ public void setFlowFileSummaries(List<FlowFileSummaryDTO> flowFileSummaries) {
+ this.flowFileSummaries = flowFileSummaries;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e762d3c7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ListingRequestEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ListingRequestEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ListingRequestEntity.java
new file mode 100644
index 0000000..5fee5c9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ListingRequestEntity.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import org.apache.nifi.web.api.dto.ListingRequestDTO;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ListingRequestDTO.
+ */
+@XmlRootElement(name = "listingRequestEntity")
+public class ListingRequestEntity extends Entity {
+
+ private ListingRequestDTO listingRequest;
+
+ /**
+ * The ListingRequestDTO that is being serialized.
+ *
+ * @return The ListingRequestDTO object
+ */
+ public ListingRequestDTO getListingRequest() {
+ return listingRequest;
+ }
+
+ public void setListingRequest(ListingRequestDTO listingRequest) {
+ this.listingRequest = listingRequest;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e762d3c7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 73d76bd..ef5f202 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -36,6 +36,7 @@ import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.FunnelDTO;
import org.apache.nifi.web.api.dto.LabelDTO;
+import org.apache.nifi.web.api.dto.ListingRequestDTO;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO;
import org.apache.nifi.web.api.dto.PortDTO;
@@ -556,6 +557,36 @@ public interface NiFiServiceFacade {
*/
DropRequestDTO deleteFlowFileDropRequest(String groupId, String connectionId, String dropRequestId);
+ /**
+ * Creates a new flow file listing request.
+ *
+ * @param groupId group
+ * @param connectionId The ID of the connection
+ * @param listingRequestId The ID of the listing request
+ * @return The ListingRequest
+ */
+ ListingRequestDTO createFlowFileListingRequest(String groupId, String connectionId, String listingRequestId);
+
+ /**
+ * Gets a new flow file listing request.
+ *
+ * @param groupId group
+ * @param connectionId The ID of the connection
+ * @param listingRequestId The ID of the listing request
+ * @return The ListingRequest
+ */
+ ListingRequestDTO getFlowFileListingRequest(String groupId, String connectionId, String listingRequestId);
+
+ /**
+ * Deletes a new flow file listing request.
+ *
+ * @param groupId group
+ * @param connectionId The ID of the connection
+ * @param listingRequestId The ID of the listing request
+ * @return The ListingRequest
+ */
+ ListingRequestDTO deleteFlowFileListingRequest(String groupId, String connectionId, String listingRequestId);
+
// ----------------------------------------
// InputPort methods
// ----------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/e762d3c7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index e7a3328..8b0ef37 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -80,6 +80,7 @@ import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.web.api.dto.ListingRequestDTO;
import org.apache.nifi.web.security.user.NiFiUserUtils;
import org.apache.nifi.user.AccountStatus;
import org.apache.nifi.user.NiFiUser;
@@ -817,6 +818,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
+ public ListingRequestDTO deleteFlowFileListingRequest(String groupId, String connectionId, String listingRequestId) {
+ return dtoFactory.createListingRequestDTO(connectionDAO.deleteFlowFileListingRequest(groupId, connectionId, listingRequestId));
+ }
+
+ @Override
public ConfigurationSnapshot<Void> deleteProcessor(final Revision revision, final String groupId, final String processorId) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
@Override
@@ -1069,7 +1075,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId, String dropRequestId) {
- return dtoFactory.createDropRequestDTO(connectionDAO.createFileFlowDropRequest(groupId, connectionId, dropRequestId));
+ return dtoFactory.createDropRequestDTO(connectionDAO.createFlowFileDropRequest(groupId, connectionId, dropRequestId));
+ }
+
+ @Override
+ public ListingRequestDTO createFlowFileListingRequest(String groupId, String connectionId, String listingRequestId) {
+ return dtoFactory.createListingRequestDTO(connectionDAO.createFlowFileListingRequest(groupId, connectionId, listingRequestId));
}
@Override
@@ -2127,6 +2138,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
+ public ListingRequestDTO getFlowFileListingRequest(String groupId, String connectionId, String listingRequestId) {
+ return dtoFactory.createListingRequestDTO(connectionDAO.getFlowFileListingRequest(groupId, connectionId, listingRequestId));
+ }
+
+ @Override
public StatusHistoryDTO getConnectionStatusHistory(String groupId, String connectionId) {
return controllerFacade.getConnectionStatusHistory(groupId, connectionId);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e762d3c7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
index 6741348..8ee765c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
@@ -58,14 +58,15 @@ import org.apache.nifi.web.ConfigurationSnapshot;
import org.apache.nifi.web.IllegalClusterResourceRequestException;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.Revision;
-import static org.apache.nifi.web.api.ApplicationResource.CLIENT_ID;
import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
+import org.apache.nifi.web.api.dto.ListingRequestDTO;
import org.apache.nifi.web.api.dto.PositionDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ConnectionsEntity;
+import org.apache.nifi.web.api.entity.ListingRequestEntity;
import org.apache.nifi.web.api.entity.StatusHistoryEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.ConnectableTypeParameter;
@@ -839,7 +840,7 @@ public class ConnectionResource extends ApplicationResource {
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
- public Response deleteRelationshipTarget(
+ public Response deleteConnection(
@Context HttpServletRequest httpServletRequest,
@ApiParam(
value = "The revision is used to verify the client is working with the latest version of the flow.",
@@ -891,8 +892,25 @@ public class ConnectionResource extends ApplicationResource {
return clusterContext(generateOkResponse(entity)).build();
}
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+ @Path("/{connection-id}/flowfiles/{flow-file-uuid}")
+ public Response getFlowFile() {
+ return null;
+ }
+
+ @DELETE
+ @Consumes(MediaType.WILDCARD)
+ @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+ @Path("/{connection-id}/flowfiles/{flow-file-uuid}")
+ public Response deleteFlowFile() {
+ return null;
+ }
+
/**
- * Drops the flowfiles in the queue of the specified connection.
+ * Drops the flowfiles in the queue of the specified connection. This endpoint is DEPRECATED. Please use
+ * POST /nifi-api/controller/process-groups/{process-group-id}/connections/{connection-id}/drop-requests instead.
*
* @param httpServletRequest request
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
@@ -906,6 +924,7 @@ public class ConnectionResource extends ApplicationResource {
@PreAuthorize("hasRole('ROLE_DFM')")
@ApiOperation(
value = "Drops the contents of the queue in this connection.",
+ notes = "This endpoint is DEPRECATED. Please use POST /nifi-api/controller/process-groups/{process-group-id}/connections/{connection-id}/drop-requests instead.",
response = DropRequestEntity.class,
authorizations = {
@Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
@@ -913,6 +932,7 @@ public class ConnectionResource extends ApplicationResource {
)
@ApiResponses(
value = {
+ @ApiResponse(code = 202, message = "The request has been accepted. A HTTP response header will contain the URI where the response can be polled."),
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@@ -920,6 +940,7 @@ public class ConnectionResource extends ApplicationResource {
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
+ @Deprecated
public Response dropQueueContents(
@Context HttpServletRequest httpServletRequest,
@ApiParam(
@@ -933,6 +954,272 @@ public class ConnectionResource extends ApplicationResource {
)
@PathParam("connection-id") String id) {
+ // defer to the new endpoint that references /drop-requests in the URI
+ return createDropRequest(httpServletRequest, clientId, id);
+ }
+
+ /**
+ * Creates a request to list the flowfiles in the queue of the specified connection.
+ *
+ * @param httpServletRequest request
+ * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+ * @param id The id of the connection
+ * @return A listRequestEntity
+ */
+ @POST
+ @Consumes(MediaType.WILDCARD)
+ @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+ @Path("/{connection-id}/listing-requests")
+ @PreAuthorize("hasRole('ROLE_DFM')")
+ @ApiOperation(
+ value = "Lists the contents of the queue in this connection.",
+ response = ListingRequestEntity.class,
+ authorizations = {
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 202, message = "The request has been accepted. A HTTP response header will contain the URI where the response can be polled."),
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response getFlowFileListing(
+ @Context HttpServletRequest httpServletRequest,
+ @ApiParam(
+ value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+ required = false
+ )
+ @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+ @ApiParam(
+ value = "The connection id.",
+ required = true
+ )
+ @PathParam("connection-id") String id) {
+
+ // replicate if cluster manager
+ if (properties.isClusterManager()) {
+ return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+ }
+
+ // ensure the id is the same across the cluster
+ final String listingRequestId;
+ final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
+ if (clusterContext != null) {
+ listingRequestId = UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString();
+ } else {
+ listingRequestId = UUID.randomUUID().toString();
+ }
+
+ // submit the listing request
+ final ListingRequestDTO listRequest = serviceFacade.createFlowFileListingRequest(groupId, id, listingRequestId);
+ listRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", id, "listing-requests", listRequest.getId()));
+
+ // create the revision
+ final RevisionDTO revision = new RevisionDTO();
+ revision.setClientId(clientId.getClientId());
+
+ // create the response entity
+ final ListingRequestEntity entity = new ListingRequestEntity();
+ entity.setRevision(revision);
+ entity.setListingRequest(listRequest);
+
+ // generate the URI where the response will be
+ final URI location = URI.create(listRequest.getUri());
+ return Response.status(Status.ACCEPTED).location(location).entity(entity).build();
+ }
+
+ /**
+ * Checks the status of an outstanding listing request.
+ *
+ * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+ * @param connectionId The id of the connection
+ * @param listingRequestId The id of the drop request
+ * @return A dropRequestEntity
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+ @Path("/{connection-id}/listing-requests/{listing-request-id}")
+ @PreAuthorize("hasRole('ROLE_DFM')")
+ @ApiOperation(
+ value = "Gets the current status of a listing request for the specified connection.",
+ response = ListingRequestEntity.class,
+ authorizations = {
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response getListingRequest(
+ @ApiParam(
+ value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+ required = false
+ )
+ @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+ @ApiParam(
+ value = "The connection id.",
+ required = true
+ )
+ @PathParam("connection-id") String connectionId,
+ @ApiParam(
+ value = "The listing request id.",
+ required = true
+ )
+ @PathParam("listing-request-id") String listingRequestId) {
+
+ // replicate if cluster manager
+ if (properties.isClusterManager()) {
+ return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+ }
+
+ // get the listing request
+ final ListingRequestDTO listingRequest = serviceFacade.getFlowFileListingRequest(groupId, connectionId, listingRequestId);
+ listingRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "listing-requests", listingRequestId));
+
+ // create the revision
+ final RevisionDTO revision = new RevisionDTO();
+ revision.setClientId(clientId.getClientId());
+
+ // create the response entity
+ final ListingRequestEntity entity = new ListingRequestEntity();
+ entity.setRevision(revision);
+ entity.setListingRequest(listingRequest);
+
+ return generateOkResponse(entity).build();
+ }
+
+ /**
+ * Deletes the specified listing request.
+ *
+ * @param httpServletRequest request
+ * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+ * @param connectionId The connection id
+ * @param listingRequestId The drop request id
+ * @return A dropRequestEntity
+ */
+ @DELETE
+ @Consumes(MediaType.WILDCARD)
+ @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+ @Path("/{connection-id}/listing-requests/{listing-request-id}")
+ @ApiOperation(
+ value = "Cancels and/or removes a request to list the contents of this connection.",
+ response = DropRequestEntity.class,
+ authorizations = {
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response deleteListingRequest(
+ @Context HttpServletRequest httpServletRequest,
+ @ApiParam(
+ value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+ required = false
+ )
+ @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+ @ApiParam(
+ value = "The connection id.",
+ required = true
+ )
+ @PathParam("connection-id") String connectionId,
+ @ApiParam(
+ value = "The listing request id.",
+ required = true
+ )
+ @PathParam("listing-request-id") String listingRequestId) {
+
+ // replicate if cluster manager
+ if (properties.isClusterManager()) {
+ return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
+ }
+
+ // handle expects request (usually from the cluster manager)
+ final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
+ if (expects != null) {
+ return generateContinueResponse().build();
+ }
+
+ // delete the listing request
+ final ListingRequestDTO dropRequest = serviceFacade.deleteFlowFileListingRequest(groupId, connectionId, listingRequestId);
+ dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "listing-requests", listingRequestId));
+
+ // prune the results as they were already received when the listing completed
+ dropRequest.setFlowFileSummaries(null);
+
+ // create the revision
+ final RevisionDTO revision = new RevisionDTO();
+ revision.setClientId(clientId.getClientId());
+
+ // create the response entity
+ final ListingRequestEntity entity = new ListingRequestEntity();
+ entity.setRevision(revision);
+ entity.setListingRequest(dropRequest);
+
+ return generateOkResponse(entity).build();
+ }
+
+ /**
+ * Creates a request to delete the flowfiles in the queue of the specified connection.
+ *
+ * @param httpServletRequest request
+ * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
+ * @param id The id of the connection
+ * @return A dropRequestEntity
+ */
+ @POST
+ @Consumes(MediaType.WILDCARD)
+ @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
+ @Path("/{connection-id}/drop-requests")
+ @PreAuthorize("hasRole('ROLE_DFM')")
+ @ApiOperation(
+ value = "Creates a request to drop the contents of the queue in this connection.",
+ response = DropRequestEntity.class,
+ authorizations = {
+ @Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 202, message = "The request has been accepted. A HTTP response header will contain the URI where the response can be polled."),
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response createDropRequest(
+ @Context HttpServletRequest httpServletRequest,
+ @ApiParam(
+ value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
+ required = false
+ )
+ @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
+ @ApiParam(
+ value = "The connection id.",
+ required = true
+ )
+ @PathParam("connection-id") String id) {
+
// replicate if cluster manager
if (properties.isClusterManager()) {
return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
@@ -968,11 +1255,7 @@ public class ConnectionResource extends ApplicationResource {
// generate the URI where the response will be
final URI location = URI.create(dropRequest.getUri());
- if (dropRequest.isFinished()) {
- return generateCreatedResponse(location, entity).build();
- } else {
- return Response.status(Status.ACCEPTED).location(location).entity(entity).build();
- }
+ return Response.status(Status.ACCEPTED).location(location).entity(entity).build();
}
/**
@@ -1057,7 +1340,7 @@ public class ConnectionResource extends ApplicationResource {
@Path("/{connection-id}/drop-requests/{drop-request-id}")
@PreAuthorize("hasRole('ROLE_DFM')")
@ApiOperation(
- value = "Cancels and/or removes a request drop of the contents in this connection.",
+ value = "Cancels and/or removes a request to drop the contents of this connection.",
response = DropRequestEntity.class,
authorizations = {
@Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
http://git-wip-us.apache.org/repos/asf/nifi/blob/e762d3c7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index f26d1b7..b93ff95 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -75,6 +75,9 @@ import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.queue.FlowFileSummary;
+import org.apache.nifi.controller.queue.ListFlowFileState;
+import org.apache.nifi.controller.queue.ListFlowFileStatus;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
@@ -345,6 +348,49 @@ public final class DtoFactory {
return dto;
}
+ private boolean isListingRequestComplete(final ListFlowFileState state) {
+ return ListFlowFileState.COMPLETE.equals(state) || ListFlowFileState.CANCELED.equals(state) || ListFlowFileState.FAILURE.equals(state);
+ }
+
+ public ListingRequestDTO createListingRequestDTO(final ListFlowFileStatus listingRequest) {
+ final ListingRequestDTO dto = new ListingRequestDTO();
+ dto.setId(listingRequest.getRequestIdentifier());
+ dto.setSubmissionTime(new Date(listingRequest.getRequestSubmissionTime()));
+ dto.setLastUpdated(new Date(listingRequest.getLastUpdated()));
+ dto.setState(listingRequest.getState().toString());
+ dto.setFailureReason(listingRequest.getFailureReason());
+ dto.setFinished(isListingRequestComplete(listingRequest.getState()));
+
+ if (isListingRequestComplete(listingRequest.getState())) {
+ dto.setPercentCompleted(100);
+
+ final List<FlowFileSummary> flowFileSummaries = listingRequest.getFlowFileSummaries();
+ if (flowFileSummaries != null) {
+ final List<FlowFileSummaryDTO> summaryDtos = new ArrayList<>(flowFileSummaries.size());
+ for (final FlowFileSummary summary : flowFileSummaries) {
+ summaryDtos.add(createFlowFileSummaryDTO(summary));
+ }
+ dto.setFlowFileSummaries(summaryDtos);
+ }
+ } else {
+ dto.setPercentCompleted(50);
+ }
+
+ return dto;
+ }
+
+ public FlowFileSummaryDTO createFlowFileSummaryDTO(final FlowFileSummary summary) {
+ final FlowFileSummaryDTO dto = new FlowFileSummaryDTO();
+ dto.setUuid(summary.getUuid());
+ dto.setFilename(summary.getFilename());
+ dto.setLastQueuedTime(new Date(summary.lastQueuedTime()));
+ dto.setLinageStartDate(new Date(summary.getLineageStartDate()));
+ dto.setPenalized(summary.isPenalized());
+ dto.setPosition(summary.getPosition());
+ dto.setSize(summary.getSize());
+ return dto;
+ }
+
/**
* Creates a ConnectionDTO from the specified Connection.
*
http://git-wip-us.apache.org/repos/asf/nifi/blob/e762d3c7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
index 2be4403..642c47e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
@@ -19,6 +19,7 @@ package org.apache.nifi.web.dao;
import java.util.Set;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
+import org.apache.nifi.controller.queue.ListFlowFileStatus;
import org.apache.nifi.web.api.dto.ConnectionDTO;
public interface ConnectionDAO {
@@ -43,6 +44,16 @@ public interface ConnectionDAO {
DropFlowFileStatus getFlowFileDropRequest(String groupId, String id, String dropRequestId);
/**
+ * Gets the specified flow file listing request.
+ *
+ * @param groupId group id
+ * @param id connection id
+ * @param listingRequestId The listing request id
+ * @return The listing request status
+ */
+ ListFlowFileStatus getFlowFileListingRequest(String groupId, String id, String listingRequestId);
+
+ /**
* Gets the connections for the specified source processor.
*
* @param groupId group id
@@ -85,7 +96,17 @@ public interface ConnectionDAO {
* @param dropRequestId drop request id
* @return The drop request status
*/
- DropFlowFileStatus createFileFlowDropRequest(String groupId, String id, String dropRequestId);
+ DropFlowFileStatus createFlowFileDropRequest(String groupId, String id, String dropRequestId);
+
+ /**
+ * Creates a new flow file listing request.
+ *
+ * @param groupId group id
+ * @param id connection id
+ * @param listingRequestId listing request id
+ * @return The listing request status
+ */
+ ListFlowFileStatus createFlowFileListingRequest(String groupId, String id, String listingRequestId);
/**
* Verifies the create request can be processed.
@@ -137,4 +158,14 @@ public interface ConnectionDAO {
* @return The drop request
*/
DropFlowFileStatus deleteFlowFileDropRequest(String groupId, String id, String dropRequestId);
+
+ /**
+ * Deletes the specified flow file listing request.
+ *
+ * @param groupId group id
+ * @param id connection id
+ * @param listingRequestId The listing request id
+ * @return The listing request status
+ */
+ ListFlowFileStatus deleteFlowFileListingRequest(String groupId, String id, String listingRequestId);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e762d3c7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
index 565e5af..0e9a90a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
@@ -34,6 +34,7 @@ import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.exception.ValidationException;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.ListFlowFileStatus;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
@@ -87,6 +88,19 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
}
@Override
+ public ListFlowFileStatus getFlowFileListingRequest(String groupId, String connectionId, String listingRequestId) {
+ final Connection connection = locateConnection(groupId, connectionId);
+ final FlowFileQueue queue = connection.getFlowFileQueue();
+
+ final ListFlowFileStatus listRequest = queue.getListFlowFileStatus(listingRequestId);
+ if (listRequest == null) {
+ throw new ResourceNotFoundException(String.format("Unable to find listing request with id '%s'.", listingRequestId));
+ }
+
+ return listRequest;
+ }
+
+ @Override
public Set<Connection> getConnectionsForSource(final String groupId, final String processorId) {
final Set<Connection> connections = new HashSet<>(getConnections(groupId));
for (final Iterator<Connection> connectionIter = connections.iterator(); connectionIter.hasNext();) {
@@ -312,7 +326,7 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
}
@Override
- public DropFlowFileStatus createFileFlowDropRequest(String groupId, String id, String dropRequestId) {
+ public DropFlowFileStatus createFlowFileDropRequest(String groupId, String id, String dropRequestId) {
final Connection connection = locateConnection(groupId, id);
final FlowFileQueue queue = connection.getFlowFileQueue();
@@ -325,6 +339,13 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
}
@Override
+ public ListFlowFileStatus createFlowFileListingRequest(String groupId, String id, String listingRequestId) {
+ final Connection connection = locateConnection(groupId, id);
+ final FlowFileQueue queue = connection.getFlowFileQueue();
+ return queue.listFlowFiles(listingRequestId);
+ }
+
+ @Override
public void verifyCreate(String groupId, ConnectionDTO connectionDTO) {
// validate the incoming request
final List<String> validationErrors = validateProposedConfiguration(groupId, connectionDTO);
@@ -508,6 +529,19 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
return dropFlowFileStatus;
}
+ @Override
+ public ListFlowFileStatus deleteFlowFileListingRequest(String groupId, String connectionId, String listingRequestId) {
+ final Connection connection = locateConnection(groupId, connectionId);
+ final FlowFileQueue queue = connection.getFlowFileQueue();
+
+ final ListFlowFileStatus listFlowFileStatus = queue.cancelListFlowFileRequest(listingRequestId);
+ if (listFlowFileStatus == null) {
+ throw new ResourceNotFoundException(String.format("Unable to find listing request with id '%s'.", listingRequestId));
+ }
+
+ return listFlowFileStatus;
+ }
+
/* setters */
public void setFlowController(final FlowController flowController) {
this.flowController = flowController;