You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/12/17 22:54:22 UTC

[3/5] nifi git commit: NIFI-108: Added merging of response for listing of flowfiles in cluster manager

NIFI-108: Added merging of response for listing of flowfiles in cluster manager


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9408507e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9408507e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9408507e

Branch: refs/heads/NIFI-108
Commit: 9408507eb5733714537afb6266e6fe886609afdc
Parents: 64a415c
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Dec 17 16:46:12 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Dec 17 16:46:12 2015 -0500

----------------------------------------------------------------------
 .../nifi/controller/queue/FlowFileSummary.java  |   2 +-
 .../controller/queue/ListFlowFileStatus.java    |  15 +++
 .../nifi/controller/queue/SortColumn.java       |  64 +++++++++--
 .../nifi/web/api/dto/ListingRequestDTO.java     |  76 ++++++++++++-
 .../cluster/manager/impl/WebClusterManager.java | 111 ++++++++++++++++---
 .../controller/queue/ListFlowFileRequest.java   |  34 ++++--
 .../nifi/controller/FlowFileSummaries.java      |  95 ++++++++++++++++
 .../nifi/controller/StandardFlowFileQueue.java  |  16 ++-
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  12 +-
 .../web/dao/impl/StandardConnectionDAO.java     |   2 +-
 10 files changed, 384 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java
index 8c37185..b7207f2 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java
@@ -45,7 +45,7 @@ public interface FlowFileSummary {
     /**
      * @return the timestamp (in milliseconds since epoch) at which the FlowFile was added to the queue
      */
-    long lastQueuedTime();
+    long getLastQueuedTime();
 
     /**
      * @return the timestamp (in milliseconds since epoch) at which the FlowFile's greatest ancestor entered the flow

http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
index 5781283..cae500d 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
@@ -22,6 +22,11 @@ import java.util.List;
 public interface ListFlowFileStatus {
 
     /**
+     * @return the maximum number of FlowFile Summary objects that should be returned
+     */
+    int getMaxResults();
+
+    /**
      * @return the identifier of the request to drop FlowFiles from the queue
      */
     String getRequestIdentifier();
@@ -72,4 +77,14 @@ public interface ListFlowFileStatus {
      * @return the percentage (an integer between 0 and 100, inclusive) of how close the request is to being completed
      */
     int getCompletionPercentage();
+
+    /**
+     * @return the total number of steps that are required in order to finish the listing
+     */
+    int getTotalStepCount();
+
+    /**
+     * @return the total number of steps that have already been completed. The value returned will be >= 0 and <= the result of calling {@link #getTotalStepCount()}.
+     */
+    int getCompletedStepCount();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java
index 48f3e9f..30d285c 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java
@@ -17,44 +17,92 @@
 
 package org.apache.nifi.controller.queue;
 
+import java.util.Comparator;
+
 /**
  * Specifies which column to sort on when performing a Listing of FlowFiles via
  * {@link FlowFileQueue#listFlowFiles(String, SortColumn, SortDirection)}
  */
-public enum SortColumn {
+public enum SortColumn implements Comparator<FlowFileSummary> {
     /**
      * Sort based on the current position in the queue
      */
-    QUEUE_POSITION,
+    QUEUE_POSITION (new Comparator<FlowFileSummary>() {
+        @Override
+        public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+            return Integer.compare(o1.getPosition(), o2.getPosition());
+        }
+    }),
 
     /**
      * Sort based on the UUID of the FlowFile
      */
-    FLOWFILE_UUID,
+    FLOWFILE_UUID (new Comparator<FlowFileSummary>() {
+        @Override
+        public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+            return o1.getUuid().compareTo(o2.getUuid());
+        }
+    }),
 
     /**
      * Sort based on the 'filename' attribute of the FlowFile
      */
-    FILENAME,
+    FILENAME (new Comparator<FlowFileSummary>() {
+        @Override
+        public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+            return o1.getFilename().compareTo(o2.getFilename());
+        }
+    }),
 
     /**
      * Sort based on the size of the FlowFile
      */
-    FLOWFILE_SIZE,
+    FLOWFILE_SIZE(new Comparator<FlowFileSummary>() {
+        @Override
+        public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+            return Long.compare(o1.getSize(), o2.getSize());
+        }
+    }),
 
     /**
      * Sort based on how long the FlowFile has been sitting in the queue
      */
-    QUEUED_DURATION,
+    QUEUED_DURATION (new Comparator<FlowFileSummary>() {
+        @Override
+        public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+            return -Long.compare(o1.getLastQueuedTime(), o2.getLastQueuedTime());
+        }
+    }),
 
     /**
      * Sort based on the age of the FlowFile. I.e., the time at which the FlowFile's
      * "greatest ancestor" entered the flow
      */
-    FLOWFILE_AGE,
+    FLOWFILE_AGE (new Comparator<FlowFileSummary>() {
+        @Override
+        public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+            return Long.compare(o1.getLineageStartDate(), o2.getLineageStartDate());
+        }
+    }),
 
     /**
      * Sort based on when the FlowFile's penalization ends
      */
-    PENALIZATION;
+    PENALIZATION (new Comparator<FlowFileSummary>() {
+        @Override
+        public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+            return Boolean.compare(o1.isPenalized(), o2.isPenalized());
+        }
+    });
+
+    private final Comparator<FlowFileSummary> comparator;
+
+    private SortColumn(final Comparator<FlowFileSummary> comparator) {
+        this.comparator = comparator;
+    }
+
+    @Override
+    public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+        return comparator.compare(o1, o2);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java
index 53c2a74..36c0518 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java
@@ -16,13 +16,15 @@
  */
 package org.apache.nifi.web.api.dto;
 
-import com.wordnik.swagger.annotations.ApiModelProperty;
-import org.apache.nifi.web.api.dto.util.TimestampAdapter;
-
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
 import java.util.Date;
 import java.util.List;
 
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.nifi.web.api.dto.util.TimestampAdapter;
+
+import com.wordnik.swagger.annotations.ApiModelProperty;
+
 public class ListingRequestDTO {
 
     private String id;
@@ -34,6 +36,11 @@ public class ListingRequestDTO {
     private Integer percentCompleted;
     private Boolean finished;
     private String failureReason;
+    private String sortColumn;
+    private String sortDirection;
+    private Integer maxResults;
+    private Integer totalStepCount;
+    private Integer completedStepCount;
 
     private String state;
 
@@ -166,4 +173,65 @@ public class ListingRequestDTO {
     public void setFlowFileSummaries(List<FlowFileSummaryDTO> flowFileSummaries) {
         this.flowFileSummaries = flowFileSummaries;
     }
+
+    /**
+     * @return the column on which the listing is sorted
+     */
+    @ApiModelProperty(value = "The column on which the FlowFiles are sorted.")
+    public String getSortColumn() {
+        return sortColumn;
+    }
+
+    public void setSortColumn(String sortColumn) {
+        this.sortColumn = sortColumn;
+    }
+
+    /**
+     * @return the direction in which the FlowFiles are sorted
+     */
+    @ApiModelProperty(value = "The direction in which the FlowFiles are sorted. Either ASCENDING or DESCENDING.")
+    public String getSortDirection() {
+        return sortDirection;
+    }
+
+    public void setSortDirection(String sortDirection) {
+        this.sortDirection = sortDirection;
+    }
+
+    /**
+     * @return the maximum number of FlowFileSummary objects to return
+     */
+    @ApiModelProperty(value = "The maximum number of FlowFileSummary objects to return")
+    public Integer getMaxResults() {
+        return maxResults;
+    }
+
+    public void setMaxResults(Integer maxResults) {
+        this.maxResults = maxResults;
+    }
+
+
+    /**
+     * @return the total number of steps required to complete the listing
+     */
+    @ApiModelProperty(value = "The total number of steps required to complete the listing")
+    public Integer getTotalStepCount() {
+        return totalStepCount;
+    }
+
+    public void setTotalStepCount(Integer totalStepCount) {
+        this.totalStepCount = totalStepCount;
+    }
+
+    /**
+     * @return the number of steps that have already been completed. This value will be >= 0 and <= the total step count
+     */
+    @ApiModelProperty(value = "The number of steps that have already been completed. This value will be between 0 and the total step count (inclusive)")
+    public Integer getCompletedStepCount() {
+        return completedStepCount;
+    }
+
+    public void setCompletedStepCount(Integer completedStepCount) {
+        this.completedStepCount = completedStepCount;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 035a888..6b43f9d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -34,11 +34,13 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Queue;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
@@ -125,6 +127,7 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.FlowFileSummaries;
 import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
@@ -132,6 +135,10 @@ import org.apache.nifi.controller.StandardFlowSerializer;
 import org.apache.nifi.controller.StandardProcessorNode;
 import org.apache.nifi.controller.ValidationContextFactory;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
+import org.apache.nifi.controller.queue.DropFlowFileState;
+import org.apache.nifi.controller.queue.ListFlowFileState;
+import org.apache.nifi.controller.queue.SortColumn;
+import org.apache.nifi.controller.queue.SortDirection;
 import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode;
 import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
 import org.apache.nifi.controller.reporting.ReportingTaskProvider;
@@ -142,6 +149,7 @@ import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
 import org.apache.nifi.controller.service.ControllerServiceLoader;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
@@ -192,13 +200,19 @@ import org.apache.nifi.util.ReflectionUtils;
 import org.apache.nifi.web.OptimisticLockingManager;
 import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.UpdateRevision;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.dto.DropRequestDTO;
+import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+import org.apache.nifi.web.api.dto.ListingRequestDTO;
 import org.apache.nifi.web.api.dto.NodeDTO;
 import org.apache.nifi.web.api.dto.ProcessGroupDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
+import org.apache.nifi.web.api.dto.ReportingTaskDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
@@ -207,7 +221,12 @@ import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
 import org.apache.nifi.web.api.dto.status.NodeStatusHistoryDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
+import org.apache.nifi.web.api.entity.ControllerServicesEntity;
+import org.apache.nifi.web.api.entity.DropRequestEntity;
 import org.apache.nifi.web.api.entity.FlowSnippetEntity;
+import org.apache.nifi.web.api.entity.ListingRequestEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
 import org.apache.nifi.web.api.entity.ProcessorsEntity;
@@ -215,6 +234,8 @@ import org.apache.nifi.web.api.entity.ProvenanceEntity;
 import org.apache.nifi.web.api.entity.ProvenanceEventEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
+import org.apache.nifi.web.api.entity.ReportingTaskEntity;
+import org.apache.nifi.web.api.entity.ReportingTasksEntity;
 import org.apache.nifi.web.util.WebUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -226,19 +247,6 @@ import org.xml.sax.SAXException;
 import org.xml.sax.SAXParseException;
 
 import com.sun.jersey.api.client.ClientResponse;
-import org.apache.nifi.controller.queue.DropFlowFileState;
-
-import org.apache.nifi.controller.service.ControllerServiceState;
-import org.apache.nifi.web.api.dto.ControllerServiceDTO;
-import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
-import org.apache.nifi.web.api.dto.DropRequestDTO;
-import org.apache.nifi.web.api.dto.ReportingTaskDTO;
-import org.apache.nifi.web.api.entity.ControllerServiceEntity;
-import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
-import org.apache.nifi.web.api.entity.ControllerServicesEntity;
-import org.apache.nifi.web.api.entity.DropRequestEntity;
-import org.apache.nifi.web.api.entity.ReportingTaskEntity;
-import org.apache.nifi.web.api.entity.ReportingTasksEntity;
 
 /**
  * Provides a cluster manager implementation. The manager federates incoming HTTP client requests to the nodes' external API using the HTTP protocol. The manager also communicates with nodes using the
@@ -319,6 +327,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node";
     public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
 
+    @Deprecated
     public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents");
     public static final Pattern DROP_REQUEST_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/drop-requests/[a-f0-9\\-]{36}");
     public static final Pattern LIST_FLOWFILES_URI = Pattern
@@ -2831,6 +2840,64 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         return normalizedValidationErrors;
     }
 
+
+    /**
+     * Merges the listing requests in the specified map into the specified listing request
+     *
+     * @param listingRequest the target listing request
+     * @param listingRequestMap the mapping of all responses being merged
+     */
+    private void mergeListingRequests(final ListingRequestDTO listingRequest, final Map<NodeIdentifier, ListingRequestDTO> listingRequestMap) {
+        final Comparator<FlowFileSummaryDTO> comparator = FlowFileSummaries.createDTOComparator(
+            SortColumn.valueOf(listingRequest.getSortColumn()), SortDirection.valueOf(listingRequest.getSortDirection()));
+
+        final NavigableSet<FlowFileSummaryDTO> flowFileSummaries = new TreeSet<>(comparator);
+
+        ListFlowFileState state = null;
+        int sumOfPercents = 0;
+        boolean finished = true;
+        for (final ListingRequestDTO nodeRequest : listingRequestMap.values()) {
+            Integer percentComplete = nodeRequest.getPercentCompleted();
+            if (percentComplete != null) {
+                sumOfPercents += percentComplete;
+            }
+
+            if (!nodeRequest.getFinished()) {
+                finished = false;
+            }
+
+            if (nodeRequest.getLastUpdated().after(listingRequest.getLastUpdated())) {
+                listingRequest.setLastUpdated(nodeRequest.getLastUpdated());
+            }
+
+            // Keep the state with the lowest ordinal value (the "least completed").
+            final ListFlowFileState nodeState = ListFlowFileState.valueOfDescription(nodeRequest.getState());
+            if (state == null || state.compareTo(nodeState) > 0) {
+                state = nodeState;
+            }
+
+            for (final FlowFileSummaryDTO summaryDTO : nodeRequest.getFlowFileSummaries()) {
+                flowFileSummaries.add(summaryDTO);
+
+                // Keep the set from growing beyond our max
+                if (flowFileSummaries.size() > listingRequest.getMaxResults()) {
+                    flowFileSummaries.pollLast();
+                }
+            }
+
+            if (nodeRequest.getFailureReason() != null) {
+                listingRequest.setFailureReason(nodeRequest.getFailureReason());
+            }
+        }
+
+        final List<FlowFileSummaryDTO> summaryDTOs = new ArrayList<>(flowFileSummaries);
+        listingRequest.setFlowFileSummaries(summaryDTOs);
+
+        final int percentCompleted = sumOfPercents / listingRequestMap.size();
+        listingRequest.setPercentCompleted(percentCompleted);
+        listingRequest.setFinished(finished);
+    }
+
     /**
      * Merges the drop requests in the specified map into the specified drop request.
      *
@@ -3316,7 +3383,23 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
             clientResponse = new NodeResponse(clientResponse, responseEntity);
         } else if (hasSuccessfulClientResponse && isListFlowFilesEndpoint(uri, method)) {
-            // TODO: IMPLEMENT
+            final ListingRequestEntity responseEntity = clientResponse.getClientResponse().getEntity(ListingRequestEntity.class);
+            final ListingRequestDTO listingRequest = responseEntity.getListingRequest();
+
+            final Map<NodeIdentifier, ListingRequestDTO> resultsMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final ListingRequestEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ListingRequestEntity.class);
+                final ListingRequestDTO nodeListingRequest = nodeResponseEntity.getListingRequest();
+
+                resultsMap.put(nodeResponse.getNodeId(), nodeListingRequest);
+            }
+            mergeListingRequests(listingRequest, resultsMap);
+
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
         } else {
             if (!nodeResponsesToDrain.isEmpty()) {
                 drainResponses(nodeResponsesToDrain);

http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
index e83fd80..aad4c4f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 public class ListFlowFileRequest implements ListFlowFileStatus {
     private final String requestId;
+    private final int maxResults;
     private final QueueSize queueSize;
     private final SortColumn sortColumn;
     private final SortDirection sortDirection;
@@ -32,13 +33,14 @@ public class ListFlowFileRequest implements ListFlowFileStatus {
     private ListFlowFileState state;
     private String failureReason;
     private int numSteps;
-    private int numCompletedSteps;
+    private int completedStepCount;
     private long lastUpdated = System.currentTimeMillis();
 
-    public ListFlowFileRequest(final String requestId, final SortColumn sortColumn, final SortDirection sortDirection, final QueueSize queueSize, final int numSteps) {
+    public ListFlowFileRequest(final String requestId, final SortColumn sortColumn, final SortDirection sortDirection, final int maxResults, final QueueSize queueSize, final int numSteps) {
         this.requestId = requestId;
         this.sortColumn = sortColumn;
         this.sortDirection = sortDirection;
+        this.maxResults = maxResults;
         this.queueSize = queueSize;
         this.numSteps = numSteps;
     }
@@ -94,11 +96,10 @@ public class ListFlowFileRequest implements ListFlowFileStatus {
         return Collections.unmodifiableList(flowFileSummaries);
     }
 
-    public synchronized void addFlowFileSummaries(final List<FlowFileSummary> summaries) {
-        // TODO: Implement.
-
+    public synchronized void setFlowFileSummaries(final List<FlowFileSummary> summaries) {
+        this.flowFileSummaries.clear();
+        this.flowFileSummaries.addAll(summaries);
         lastUpdated = System.currentTimeMillis();
-        this.numCompletedSteps++;
     }
 
     @Override
@@ -117,6 +118,25 @@ public class ListFlowFileRequest implements ListFlowFileStatus {
 
     @Override
     public synchronized int getCompletionPercentage() {
-        return (int) (100F * numCompletedSteps / numSteps);
+        return (int) (100F * completedStepCount / numSteps);
+    }
+
+    public synchronized void setCompletedStepCount(final int completedStepCount) {
+        this.completedStepCount = completedStepCount;
+    }
+
+    @Override
+    public int getMaxResults() {
+        return maxResults;
+    }
+
+    @Override
+    public int getTotalStepCount() {
+        return numSteps;
+    }
+
+    @Override
+    public int getCompletedStepCount() {
+        return completedStepCount;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java
new file mode 100644
index 0000000..5a0a3ab
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller;
+
+import java.util.Collections;
+import java.util.Comparator;
+
+import org.apache.nifi.controller.queue.FlowFileSummary;
+import org.apache.nifi.controller.queue.SortColumn;
+import org.apache.nifi.controller.queue.SortDirection;
+import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
+
+public class FlowFileSummaries {
+
+    public static Comparator<FlowFileSummary> createComparator(final SortColumn column, final SortDirection direction) {
+        final Comparator<FlowFileSummary> comparator = new Comparator<FlowFileSummary>() {
+            @Override
+            public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+                switch (column) {
+                    case FILENAME:
+                        return o1.getFilename().compareTo(o2.getFilename());
+                    case FLOWFILE_AGE:
+                        return Long.compare(o1.getLineageStartDate(), o2.getLineageStartDate());
+                    case FLOWFILE_SIZE:
+                        return Long.compare(o1.getSize(), o2.getSize());
+                    case FLOWFILE_UUID:
+                        return o1.getUuid().compareTo(o2.getUuid());
+                    case PENALIZATION:
+                        return Boolean.compare(o1.isPenalized(), o2.isPenalized());
+                    case QUEUE_POSITION:
+                        return Long.compare(o1.getPosition(), o2.getPosition());
+                    case QUEUED_DURATION:
+                        return Long.compare(o1.getLastQueuedTime(), o2.getLastQueuedTime());
+                }
+
+                return 0;
+            }
+        };
+
+
+        if (direction == SortDirection.DESCENDING) {
+            return Collections.reverseOrder(comparator);
+        } else {
+            return comparator;
+        }
+    }
+
+    public static Comparator<FlowFileSummaryDTO> createDTOComparator(final SortColumn column, final SortDirection direction) {
+        final Comparator<FlowFileSummaryDTO> comparator = new Comparator<FlowFileSummaryDTO>() {
+            @Override
+            public int compare(final FlowFileSummaryDTO o1, final FlowFileSummaryDTO o2) {
+                switch (column) {
+                    case FILENAME:
+                        return o1.getFilename().compareTo(o2.getFilename());
+                    case FLOWFILE_AGE:
+                        return o1.getLinageStartDate().compareTo(o2.getLinageStartDate());
+                    case FLOWFILE_SIZE:
+                        return Long.compare(o1.getSize(), o2.getSize());
+                    case FLOWFILE_UUID:
+                        return o1.getUuid().compareTo(o2.getUuid());
+                    case PENALIZATION:
+                        return Boolean.compare(o1.getPenalized(), o2.getPenalized());
+                    case QUEUE_POSITION:
+                        return Long.compare(o1.getPosition(), o2.getPosition());
+                    case QUEUED_DURATION:
+                        return o1.getLastQueuedTime().compareTo(o2.getLastQueuedTime());
+                }
+
+                return 0;
+            }
+        };
+
+        if (direction == SortDirection.DESCENDING) {
+            return Collections.reverseOrder(comparator);
+        } else {
+            return comparator;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 9935ba4..daaa763 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -894,12 +894,13 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         final int numSteps;
         readLock.lock();
         try {
-            numSteps = 1 + swapLocations.size();
+            // numSteps = 1 for each swap location + 1 for active queue + 1 for swap queue.
+            numSteps = 2 + swapLocations.size();
         } finally {
             readLock.unlock("listFlowFiles");
         }
 
-        final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, sortColumn, direction, size(), numSteps);
+        final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, sortColumn, direction, maxResults, size(), numSteps);
 
         final Thread t = new Thread(new Runnable() {
             @Override
@@ -907,6 +908,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                 int position = 0;
                 int resultCount = 0;
                 final List<FlowFileSummary> summaries = new ArrayList<>(activeQueue.size());
+                int completedStepCount = 0;
 
                 // we need a write lock while using the Active Queue because we can't iterate over it - we have to poll from it
                 // continually. This is because the iterator for PriorityQueue does not iterate over the elements in any particular
@@ -935,6 +937,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                     writeLock.unlock("List FlowFiles");
                 }
 
+                listRequest.setCompletedStepCount(++completedStepCount);
+
                 position = activeQueue.size();
                 sourceLoop: while (resultCount < maxResults) {
                     try {
@@ -954,6 +958,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                                         }
                                     }
                                 }
+
+                                listRequest.setCompletedStepCount(++completedStepCount);
                             }
 
                             for (final FlowFileRecord flowFile : swapQueue) {
@@ -966,6 +972,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                                     }
                                 }
                             }
+
+                            listRequest.setCompletedStepCount(++completedStepCount);
                         } finally {
                             readLock.unlock("List FlowFiles");
                         }
@@ -974,6 +982,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                         listRequest.setFailure("Could not read FlowFiles from queue. Check log files for more details.");
                     }
                 }
+
+                listRequest.setFlowFileSummaries(summaries);
             }
         }, "List FlowFiles for Connection " + getIdentifier());
         t.setDaemon(true);
@@ -1018,7 +1028,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
             }
 
             @Override
-            public long lastQueuedTime() {
+            public long getLastQueuedTime() {
                 return lastQueuedTime;
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index b93ff95..0805b54 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -360,10 +360,14 @@ public final class DtoFactory {
         dto.setState(listingRequest.getState().toString());
         dto.setFailureReason(listingRequest.getFailureReason());
         dto.setFinished(isListingRequestComplete(listingRequest.getState()));
+        dto.setMaxResults(listingRequest.getMaxResults());
+        dto.setSortColumn(listingRequest.getSortColumn().name());
+        dto.setSortDirection(listingRequest.getSortDirection().name());
+        dto.setTotalStepCount(listingRequest.getTotalStepCount());
+        dto.setCompletedStepCount(listingRequest.getCompletedStepCount());
+        dto.setPercentCompleted(listingRequest.getCompletionPercentage());
 
         if (isListingRequestComplete(listingRequest.getState())) {
-            dto.setPercentCompleted(100);
-
             final List<FlowFileSummary> flowFileSummaries = listingRequest.getFlowFileSummaries();
             if (flowFileSummaries != null) {
                 final List<FlowFileSummaryDTO> summaryDtos = new ArrayList<>(flowFileSummaries.size());
@@ -372,8 +376,6 @@ public final class DtoFactory {
                 }
                 dto.setFlowFileSummaries(summaryDtos);
             }
-        } else {
-            dto.setPercentCompleted(50);
         }
 
         return dto;
@@ -383,7 +385,7 @@ public final class DtoFactory {
         final FlowFileSummaryDTO dto = new FlowFileSummaryDTO();
         dto.setUuid(summary.getUuid());
         dto.setFilename(summary.getFilename());
-        dto.setLastQueuedTime(new Date(summary.lastQueuedTime()));
+        dto.setLastQueuedTime(new Date(summary.getLastQueuedTime()));
         dto.setLinageStartDate(new Date(summary.getLineageStartDate()));
         dto.setPenalized(summary.isPenalized());
         dto.setPosition(summary.getPosition());

http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
index 0e9a90a..459c2b5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
@@ -342,7 +342,7 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
     public ListFlowFileStatus createFlowFileListingRequest(String groupId, String id, String listingRequestId) {
         final Connection connection = locateConnection(groupId, id);
         final FlowFileQueue queue = connection.getFlowFileQueue();
-        return queue.listFlowFiles(listingRequestId);
+        return queue.listFlowFiles(listingRequestId, 100);
     }
 
     @Override