You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/12/17 22:54:22 UTC
[3/5] nifi git commit: NIFI-108: Added merging of response for
listing of flowfiles in cluster manager
NIFI-108: Added merging of response for listing of flowfiles in cluster manager
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9408507e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9408507e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9408507e
Branch: refs/heads/NIFI-108
Commit: 9408507eb5733714537afb6266e6fe886609afdc
Parents: 64a415c
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Dec 17 16:46:12 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Dec 17 16:46:12 2015 -0500
----------------------------------------------------------------------
.../nifi/controller/queue/FlowFileSummary.java | 2 +-
.../controller/queue/ListFlowFileStatus.java | 15 +++
.../nifi/controller/queue/SortColumn.java | 64 +++++++++--
.../nifi/web/api/dto/ListingRequestDTO.java | 76 ++++++++++++-
.../cluster/manager/impl/WebClusterManager.java | 111 ++++++++++++++++---
.../controller/queue/ListFlowFileRequest.java | 34 ++++--
.../nifi/controller/FlowFileSummaries.java | 95 ++++++++++++++++
.../nifi/controller/StandardFlowFileQueue.java | 16 ++-
.../org/apache/nifi/web/api/dto/DtoFactory.java | 12 +-
.../web/dao/impl/StandardConnectionDAO.java | 2 +-
10 files changed, 384 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java
index 8c37185..b7207f2 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java
@@ -45,7 +45,7 @@ public interface FlowFileSummary {
/**
* @return the timestamp (in milliseconds since epoch) at which the FlowFile was added to the queue
*/
- long lastQueuedTime();
+ long getLastQueuedTime();
/**
* @return the timestamp (in milliseconds since epoch) at which the FlowFile's greatest ancestor entered the flow
http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
index 5781283..cae500d 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
@@ -22,6 +22,11 @@ import java.util.List;
public interface ListFlowFileStatus {
/**
+ * @return the maximum number of FlowFile Summary objects that should be returned
+ */
+ int getMaxResults();
+
+ /**
* @return the identifier of the request to drop FlowFiles from the queue
*/
String getRequestIdentifier();
@@ -72,4 +77,14 @@ public interface ListFlowFileStatus {
* @return the percentage (an integer between 0 and 100, inclusive) of how close the request is to being completed
*/
int getCompletionPercentage();
+
+ /**
+ * @return the total number of steps that are required in order to finish the listing
+ */
+ int getTotalStepCount();
+
+ /**
+ * @return the total number of steps that have already been completed. The value returned will be >= 0 and <= the result of calling {@link #getTotalStepCount()}.
+ */
+ int getCompletedStepCount();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java
index 48f3e9f..30d285c 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java
@@ -17,44 +17,92 @@
package org.apache.nifi.controller.queue;
+import java.util.Comparator;
+
/**
* Specifies which column to sort on when performing a Listing of FlowFiles via
* {@link FlowFileQueue#listFlowFiles(String, SortColumn, SortDirection)}
*/
-public enum SortColumn {
+public enum SortColumn implements Comparator<FlowFileSummary> {
/**
* Sort based on the current position in the queue
*/
- QUEUE_POSITION,
+ QUEUE_POSITION (new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return Integer.compare(o1.getPosition(), o2.getPosition());
+ }
+ }),
/**
* Sort based on the UUID of the FlowFile
*/
- FLOWFILE_UUID,
+ FLOWFILE_UUID (new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return o1.getUuid().compareTo(o2.getUuid());
+ }
+ }),
/**
* Sort based on the 'filename' attribute of the FlowFile
*/
- FILENAME,
+ FILENAME (new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return o1.getFilename().compareTo(o2.getFilename());
+ }
+ }),
/**
* Sort based on the size of the FlowFile
*/
- FLOWFILE_SIZE,
+ FLOWFILE_SIZE(new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return Long.compare(o1.getSize(), o2.getSize());
+ }
+ }),
/**
* Sort based on how long the FlowFile has been sitting in the queue
*/
- QUEUED_DURATION,
+ QUEUED_DURATION (new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return -Long.compare(o1.getLastQueuedTime(), o2.getLastQueuedTime());
+ }
+ }),
/**
* Sort based on the age of the FlowFile. I.e., the time at which the FlowFile's
* "greatest ancestor" entered the flow
*/
- FLOWFILE_AGE,
+ FLOWFILE_AGE (new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return Long.compare(o1.getLineageStartDate(), o2.getLineageStartDate());
+ }
+ }),
/**
* Sort based on when the FlowFile's penalization ends
*/
- PENALIZATION;
+ PENALIZATION (new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return Boolean.compare(o1.isPenalized(), o2.isPenalized());
+ }
+ });
+
+ private final Comparator<FlowFileSummary> comparator;
+
+ private SortColumn(final Comparator<FlowFileSummary> comparator) {
+ this.comparator = comparator;
+ }
+
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return comparator.compare(o1, o2);
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java
index 53c2a74..36c0518 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java
@@ -16,13 +16,15 @@
*/
package org.apache.nifi.web.api.dto;
-import com.wordnik.swagger.annotations.ApiModelProperty;
-import org.apache.nifi.web.api.dto.util.TimestampAdapter;
-
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import java.util.Date;
import java.util.List;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.nifi.web.api.dto.util.TimestampAdapter;
+
+import com.wordnik.swagger.annotations.ApiModelProperty;
+
public class ListingRequestDTO {
private String id;
@@ -34,6 +36,11 @@ public class ListingRequestDTO {
private Integer percentCompleted;
private Boolean finished;
private String failureReason;
+ private String sortColumn;
+ private String sortDirection;
+ private Integer maxResults;
+ private Integer totalStepCount;
+ private Integer completedStepCount;
private String state;
@@ -166,4 +173,65 @@ public class ListingRequestDTO {
public void setFlowFileSummaries(List<FlowFileSummaryDTO> flowFileSummaries) {
this.flowFileSummaries = flowFileSummaries;
}
+
+ /**
+ * @return the column on which the listing is sorted
+ */
+ @ApiModelProperty(value = "The column on which the FlowFiles are sorted.")
+ public String getSortColumn() {
+ return sortColumn;
+ }
+
+ public void setSortColumn(String sortColumn) {
+ this.sortColumn = sortColumn;
+ }
+
+ /**
+ * @return the direction in which the FlowFiles are sorted
+ */
+ @ApiModelProperty(value = "The direction in which the FlowFiles are sorted. Either ASCENDING or DESCENDING.")
+ public String getSortDirection() {
+ return sortDirection;
+ }
+
+ public void setSortDirection(String sortDirection) {
+ this.sortDirection = sortDirection;
+ }
+
+ /**
+ * @return the maximum number of FlowFileSummary objects to return
+ */
+ @ApiModelProperty(value = "The maximum number of FlowFileSummary objects to return")
+ public Integer getMaxResults() {
+ return maxResults;
+ }
+
+ public void setMaxResults(Integer maxResults) {
+ this.maxResults = maxResults;
+ }
+
+
+ /**
+ * @return the total number of steps required to complete the listing
+ */
+ @ApiModelProperty(value = "The total number of steps required to complete the listing")
+ public Integer getTotalStepCount() {
+ return totalStepCount;
+ }
+
+ public void setTotalStepCount(Integer totalStepCount) {
+ this.totalStepCount = totalStepCount;
+ }
+
+ /**
+ * @return the number of steps that have already been completed. This value will be >= 0 and <= the total step count
+ */
+ @ApiModelProperty(value = "The number of steps that have already been completed. This value will be between 0 and the total step count (inclusive)")
+ public Integer getCompletedStepCount() {
+ return completedStepCount;
+ }
+
+ public void setCompletedStepCount(Integer completedStepCount) {
+ this.completedStepCount = completedStepCount;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 035a888..6b43f9d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -34,11 +34,13 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Queue;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
@@ -125,6 +127,7 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.FlowFileSummaries;
import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
@@ -132,6 +135,10 @@ import org.apache.nifi.controller.StandardFlowSerializer;
import org.apache.nifi.controller.StandardProcessorNode;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
+import org.apache.nifi.controller.queue.DropFlowFileState;
+import org.apache.nifi.controller.queue.ListFlowFileState;
+import org.apache.nifi.controller.queue.SortColumn;
+import org.apache.nifi.controller.queue.SortDirection;
import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.reporting.ReportingTaskProvider;
@@ -142,6 +149,7 @@ import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
import org.apache.nifi.controller.service.ControllerServiceLoader;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
@@ -192,13 +200,19 @@ import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.web.OptimisticLockingManager;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.UpdateRevision;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.dto.DropRequestDTO;
+import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+import org.apache.nifi.web.api.dto.ListingRequestDTO;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
+import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
@@ -207,7 +221,12 @@ import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
import org.apache.nifi.web.api.dto.status.NodeStatusHistoryDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
+import org.apache.nifi.web.api.entity.ControllerServicesEntity;
+import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.apache.nifi.web.api.entity.FlowSnippetEntity;
+import org.apache.nifi.web.api.entity.ListingRequestEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorsEntity;
@@ -215,6 +234,8 @@ import org.apache.nifi.web.api.entity.ProvenanceEntity;
import org.apache.nifi.web.api.entity.ProvenanceEventEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
+import org.apache.nifi.web.api.entity.ReportingTaskEntity;
+import org.apache.nifi.web.api.entity.ReportingTasksEntity;
import org.apache.nifi.web.util.WebUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -226,19 +247,6 @@ import org.xml.sax.SAXException;
import org.xml.sax.SAXParseException;
import com.sun.jersey.api.client.ClientResponse;
-import org.apache.nifi.controller.queue.DropFlowFileState;
-
-import org.apache.nifi.controller.service.ControllerServiceState;
-import org.apache.nifi.web.api.dto.ControllerServiceDTO;
-import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
-import org.apache.nifi.web.api.dto.DropRequestDTO;
-import org.apache.nifi.web.api.dto.ReportingTaskDTO;
-import org.apache.nifi.web.api.entity.ControllerServiceEntity;
-import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
-import org.apache.nifi.web.api.entity.ControllerServicesEntity;
-import org.apache.nifi.web.api.entity.DropRequestEntity;
-import org.apache.nifi.web.api.entity.ReportingTaskEntity;
-import org.apache.nifi.web.api.entity.ReportingTasksEntity;
/**
* Provides a cluster manager implementation. The manager federates incoming HTTP client requests to the nodes' external API using the HTTP protocol. The manager also communicates with nodes using the
@@ -319,6 +327,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node";
public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
+ @Deprecated
public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents");
public static final Pattern DROP_REQUEST_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/drop-requests/[a-f0-9\\-]{36}");
public static final Pattern LIST_FLOWFILES_URI = Pattern
@@ -2831,6 +2840,64 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return normalizedValidationErrors;
}
+
+ /**
+ * Merges the listing requests in the specified map into the specified listing request
+ *
+ * @param listingRequest the target listing request
+ * @param listingRequestMap the mapping of all responses being merged
+ */
+ private void mergeListingRequests(final ListingRequestDTO listingRequest, final Map<NodeIdentifier, ListingRequestDTO> listingRequestMap) {
+ final Comparator<FlowFileSummaryDTO> comparator = FlowFileSummaries.createDTOComparator(
+ SortColumn.valueOf(listingRequest.getSortColumn()), SortDirection.valueOf(listingRequest.getSortDirection()));
+
+ final NavigableSet<FlowFileSummaryDTO> flowFileSummaries = new TreeSet<>(comparator);
+
+ ListFlowFileState state = null;
+ int sumOfPercents = 0;
+ boolean finished = true;
+ for (final ListingRequestDTO nodeRequest : listingRequestMap.values()) {
+ Integer percentComplete = nodeRequest.getPercentCompleted();
+ if (percentComplete != null) {
+ sumOfPercents += percentComplete;
+ }
+
+ if (!nodeRequest.getFinished()) {
+ finished = false;
+ }
+
+ if (nodeRequest.getLastUpdated().after(listingRequest.getLastUpdated())) {
+ listingRequest.setLastUpdated(nodeRequest.getLastUpdated());
+ }
+
+ // Keep the state with the lowest ordinal value (the "least completed").
+ final ListFlowFileState nodeState = ListFlowFileState.valueOfDescription(nodeRequest.getState());
+ if (state == null || state.compareTo(nodeState) > 0) {
+ state = nodeState;
+ }
+
+ for (final FlowFileSummaryDTO summaryDTO : nodeRequest.getFlowFileSummaries()) {
+ flowFileSummaries.add(summaryDTO);
+
+ // Keep the set from growing beyond our max
+ if (flowFileSummaries.size() > listingRequest.getMaxResults()) {
+ flowFileSummaries.pollLast();
+ }
+ }
+
+ if (nodeRequest.getFailureReason() != null) {
+ listingRequest.setFailureReason(nodeRequest.getFailureReason());
+ }
+ }
+
+ final List<FlowFileSummaryDTO> summaryDTOs = new ArrayList<>(flowFileSummaries);
+ listingRequest.setFlowFileSummaries(summaryDTOs);
+
+ final int percentCompleted = sumOfPercents / listingRequestMap.size();
+ listingRequest.setPercentCompleted(percentCompleted);
+ listingRequest.setFinished(finished);
+ }
+
/**
* Merges the drop requests in the specified map into the specified drop request.
*
@@ -3316,7 +3383,23 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
clientResponse = new NodeResponse(clientResponse, responseEntity);
} else if (hasSuccessfulClientResponse && isListFlowFilesEndpoint(uri, method)) {
- // TODO: IMPLEMENT
+ final ListingRequestEntity responseEntity = clientResponse.getClientResponse().getEntity(ListingRequestEntity.class);
+ final ListingRequestDTO listingRequest = responseEntity.getListingRequest();
+
+ final Map<NodeIdentifier, ListingRequestDTO> resultsMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final ListingRequestEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ListingRequestEntity.class);
+ final ListingRequestDTO nodeListingRequest = nodeResponseEntity.getListingRequest();
+
+ resultsMap.put(nodeResponse.getNodeId(), nodeListingRequest);
+ }
+ mergeListingRequests(listingRequest, resultsMap);
+
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
} else {
if (!nodeResponsesToDrain.isEmpty()) {
drainResponses(nodeResponsesToDrain);
http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
index e83fd80..aad4c4f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
@@ -23,6 +23,7 @@ import java.util.List;
public class ListFlowFileRequest implements ListFlowFileStatus {
private final String requestId;
+ private final int maxResults;
private final QueueSize queueSize;
private final SortColumn sortColumn;
private final SortDirection sortDirection;
@@ -32,13 +33,14 @@ public class ListFlowFileRequest implements ListFlowFileStatus {
private ListFlowFileState state;
private String failureReason;
private int numSteps;
- private int numCompletedSteps;
+ private int completedStepCount;
private long lastUpdated = System.currentTimeMillis();
- public ListFlowFileRequest(final String requestId, final SortColumn sortColumn, final SortDirection sortDirection, final QueueSize queueSize, final int numSteps) {
+ public ListFlowFileRequest(final String requestId, final SortColumn sortColumn, final SortDirection sortDirection, final int maxResults, final QueueSize queueSize, final int numSteps) {
this.requestId = requestId;
this.sortColumn = sortColumn;
this.sortDirection = sortDirection;
+ this.maxResults = maxResults;
this.queueSize = queueSize;
this.numSteps = numSteps;
}
@@ -94,11 +96,10 @@ public class ListFlowFileRequest implements ListFlowFileStatus {
return Collections.unmodifiableList(flowFileSummaries);
}
- public synchronized void addFlowFileSummaries(final List<FlowFileSummary> summaries) {
- // TODO: Implement.
-
+ public synchronized void setFlowFileSummaries(final List<FlowFileSummary> summaries) {
+ this.flowFileSummaries.clear();
+ this.flowFileSummaries.addAll(summaries);
lastUpdated = System.currentTimeMillis();
- this.numCompletedSteps++;
}
@Override
@@ -117,6 +118,25 @@ public class ListFlowFileRequest implements ListFlowFileStatus {
@Override
public synchronized int getCompletionPercentage() {
- return (int) (100F * numCompletedSteps / numSteps);
+ return (int) (100F * completedStepCount / numSteps);
+ }
+
+ public synchronized void setCompletedStepCount(final int completedStepCount) {
+ this.completedStepCount = completedStepCount;
+ }
+
+ @Override
+ public int getMaxResults() {
+ return maxResults;
+ }
+
+ @Override
+ public int getTotalStepCount() {
+ return numSteps;
+ }
+
+ @Override
+ public int getCompletedStepCount() {
+ return completedStepCount;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java
new file mode 100644
index 0000000..5a0a3ab
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller;
+
+import java.util.Collections;
+import java.util.Comparator;
+
+import org.apache.nifi.controller.queue.FlowFileSummary;
+import org.apache.nifi.controller.queue.SortColumn;
+import org.apache.nifi.controller.queue.SortDirection;
+import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
+
+public class FlowFileSummaries {
+
+ public static Comparator<FlowFileSummary> createComparator(final SortColumn column, final SortDirection direction) {
+ final Comparator<FlowFileSummary> comparator = new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ switch (column) {
+ case FILENAME:
+ return o1.getFilename().compareTo(o2.getFilename());
+ case FLOWFILE_AGE:
+ return Long.compare(o1.getLineageStartDate(), o2.getLineageStartDate());
+ case FLOWFILE_SIZE:
+ return Long.compare(o1.getSize(), o2.getSize());
+ case FLOWFILE_UUID:
+ return o1.getUuid().compareTo(o2.getUuid());
+ case PENALIZATION:
+ return Boolean.compare(o1.isPenalized(), o2.isPenalized());
+ case QUEUE_POSITION:
+ return Long.compare(o1.getPosition(), o2.getPosition());
+ case QUEUED_DURATION:
+ return Long.compare(o1.getLastQueuedTime(), o2.getLastQueuedTime());
+ }
+
+ return 0;
+ }
+ };
+
+
+ if (direction == SortDirection.DESCENDING) {
+ return Collections.reverseOrder(comparator);
+ } else {
+ return comparator;
+ }
+ }
+
+ public static Comparator<FlowFileSummaryDTO> createDTOComparator(final SortColumn column, final SortDirection direction) {
+ final Comparator<FlowFileSummaryDTO> comparator = new Comparator<FlowFileSummaryDTO>() {
+ @Override
+ public int compare(final FlowFileSummaryDTO o1, final FlowFileSummaryDTO o2) {
+ switch (column) {
+ case FILENAME:
+ return o1.getFilename().compareTo(o2.getFilename());
+ case FLOWFILE_AGE:
+ return o1.getLinageStartDate().compareTo(o2.getLinageStartDate());
+ case FLOWFILE_SIZE:
+ return Long.compare(o1.getSize(), o2.getSize());
+ case FLOWFILE_UUID:
+ return o1.getUuid().compareTo(o2.getUuid());
+ case PENALIZATION:
+ return Boolean.compare(o1.getPenalized(), o2.getPenalized());
+ case QUEUE_POSITION:
+ return Long.compare(o1.getPosition(), o2.getPosition());
+ case QUEUED_DURATION:
+ return o1.getLastQueuedTime().compareTo(o2.getLastQueuedTime());
+ }
+
+ return 0;
+ }
+ };
+
+ if (direction == SortDirection.DESCENDING) {
+ return Collections.reverseOrder(comparator);
+ } else {
+ return comparator;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 9935ba4..daaa763 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -894,12 +894,13 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final int numSteps;
readLock.lock();
try {
- numSteps = 1 + swapLocations.size();
+ // numSteps = 1 for each swap location + 1 for active queue + 1 for swap queue.
+ numSteps = 2 + swapLocations.size();
} finally {
readLock.unlock("listFlowFiles");
}
- final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, sortColumn, direction, size(), numSteps);
+ final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, sortColumn, direction, maxResults, size(), numSteps);
final Thread t = new Thread(new Runnable() {
@Override
@@ -907,6 +908,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
int position = 0;
int resultCount = 0;
final List<FlowFileSummary> summaries = new ArrayList<>(activeQueue.size());
+ int completedStepCount = 0;
// we need a write lock while using the Active Queue because we can't iterate over it - we have to poll from it
// continually. This is because the iterator for PriorityQueue does not iterate over the elements in any particular
@@ -935,6 +937,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
writeLock.unlock("List FlowFiles");
}
+ listRequest.setCompletedStepCount(++completedStepCount);
+
position = activeQueue.size();
sourceLoop: while (resultCount < maxResults) {
try {
@@ -954,6 +958,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
}
}
}
+
+ listRequest.setCompletedStepCount(++completedStepCount);
}
for (final FlowFileRecord flowFile : swapQueue) {
@@ -966,6 +972,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
}
}
}
+
+ listRequest.setCompletedStepCount(++completedStepCount);
} finally {
readLock.unlock("List FlowFiles");
}
@@ -974,6 +982,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
listRequest.setFailure("Could not read FlowFiles from queue. Check log files for more details.");
}
}
+
+ listRequest.setFlowFileSummaries(summaries);
}
}, "List FlowFiles for Connection " + getIdentifier());
t.setDaemon(true);
@@ -1018,7 +1028,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
}
@Override
- public long lastQueuedTime() {
+ public long getLastQueuedTime() {
return lastQueuedTime;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index b93ff95..0805b54 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -360,10 +360,14 @@ public final class DtoFactory {
dto.setState(listingRequest.getState().toString());
dto.setFailureReason(listingRequest.getFailureReason());
dto.setFinished(isListingRequestComplete(listingRequest.getState()));
+ dto.setMaxResults(listingRequest.getMaxResults());
+ dto.setSortColumn(listingRequest.getSortColumn().name());
+ dto.setSortDirection(listingRequest.getSortDirection().name());
+ dto.setTotalStepCount(listingRequest.getTotalStepCount());
+ dto.setCompletedStepCount(listingRequest.getCompletedStepCount());
+ dto.setPercentCompleted(listingRequest.getCompletionPercentage());
if (isListingRequestComplete(listingRequest.getState())) {
- dto.setPercentCompleted(100);
-
final List<FlowFileSummary> flowFileSummaries = listingRequest.getFlowFileSummaries();
if (flowFileSummaries != null) {
final List<FlowFileSummaryDTO> summaryDtos = new ArrayList<>(flowFileSummaries.size());
@@ -372,8 +376,6 @@ public final class DtoFactory {
}
dto.setFlowFileSummaries(summaryDtos);
}
- } else {
- dto.setPercentCompleted(50);
}
return dto;
@@ -383,7 +385,7 @@ public final class DtoFactory {
final FlowFileSummaryDTO dto = new FlowFileSummaryDTO();
dto.setUuid(summary.getUuid());
dto.setFilename(summary.getFilename());
- dto.setLastQueuedTime(new Date(summary.lastQueuedTime()));
+ dto.setLastQueuedTime(new Date(summary.getLastQueuedTime()));
dto.setLinageStartDate(new Date(summary.getLineageStartDate()));
dto.setPenalized(summary.isPenalized());
dto.setPosition(summary.getPosition());
http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
index 0e9a90a..459c2b5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
@@ -342,7 +342,7 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
public ListFlowFileStatus createFlowFileListingRequest(String groupId, String id, String listingRequestId) {
final Connection connection = locateConnection(groupId, id);
final FlowFileQueue queue = connection.getFlowFileQueue();
- return queue.listFlowFiles(listingRequestId);
+ return queue.listFlowFiles(listingRequestId, 100);
}
@Override