You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/12/17 22:54:20 UTC

[1/5] nifi git commit: NIFI-108: Implementing ability to list FlowFiles in a queue

Repository: nifi
Updated Branches:
  refs/heads/NIFI-108 670733753 -> b12aba782


NIFI-108: Implementing ability to list FlowFiles in a queue


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/245d8d4d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/245d8d4d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/245d8d4d

Branch: refs/heads/NIFI-108
Commit: 245d8d4d2d3fbd6182e7e587d3433cc7378cc37a
Parents: 2e22954
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Dec 17 13:00:30 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Dec 17 13:00:30 2015 -0500

----------------------------------------------------------------------
 .../nifi/controller/queue/FlowFileQueue.java    |  33 ++-
 .../controller/queue/ListFlowFileStatus.java    |   5 +
 .../cluster/manager/impl/WebClusterManager.java |   8 +
 .../controller/queue/ListFlowFileRequest.java   |  79 +++++--
 .../apache/nifi/controller/FlowController.java  |  39 +++-
 .../nifi/controller/StandardFlowFileQueue.java  | 234 ++++++++++++++++++-
 6 files changed, 363 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
index fe8649d..dbf2f04 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.controller.queue;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Queue;
@@ -217,12 +218,32 @@ public interface FlowFileQueue {
      * will be returned ordered by the position of the FlowFile in the queue.
      *
      * @param requestIdentifier the identifier of the List FlowFile Request
+     * @param maxResults the maximum number of FlowFileSummary objects to add to the ListFlowFileStatus
+     *
+     * @return the status for the request
+     *
+     * @throws IllegalStateException if either the source or the destination of the connection to which this queue belongs
+     *             is currently running.
+     */
+    ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults);
+
+    /**
+     * Initiates a request to obtain a listing of FlowFiles in this queue. This method returns a
+     * ListFlowFileStatus that can be used to obtain information about the FlowFiles that exist
+     * within the queue. Additionally, the ListFlowFileStatus provides a request identifier that
+     * can then be passed to the {@link #getListFlowFileStatus(String)}
+     *
+     * @param requestIdentifier the identifier of the List FlowFile Request
+     * @param maxResults the maximum number of FlowFileSummary objects to add to the ListFlowFileStatus
+     * @param sortColumn specifies which column to sort on
+     * @param direction specifies which direction to sort the FlowFiles
+     *
      * @return the status for the request
      *
      * @throws IllegalStateException if either the source or the destination of the connection to which this queue belongs
      *             is currently running.
      */
-    ListFlowFileStatus listFlowFiles(String requestIdentifier);
+    ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults, SortColumn sortColumn, SortDirection direction);
 
     /**
      * Initiates a request to obtain a listing of FlowFiles in this queue. This method returns a
@@ -231,6 +252,9 @@ public interface FlowFileQueue {
      * can then be passed to the {@link #getListFlowFileStatus(String)}
      *
      * @param requestIdentifier the identifier of the List FlowFile Request
+     * @param maxResults the maximum number of FlowFileSummary objects to add to the ListFlowFileStatus
+     * @param query an Expression Language expression that will be evaluated against all FlowFiles. Only FlowFiles that satisfy the expression will
+     *            be included in the results. The expression must be a valid expression and return a Boolean type
      * @param sortColumn specifies which column to sort on
      * @param direction specifies which direction to sort the FlowFiles
      *
@@ -238,8 +262,9 @@ public interface FlowFileQueue {
      *
      * @throws IllegalStateException if either the source or the destination of the connection to which this queue belongs
      *             is currently running.
+     * @throws IllegalArgumentException if query is not a valid Expression Language expression or does not return a boolean type
      */
-    ListFlowFileStatus listFlowFiles(String requestIdentifier, SortColumn sortColumn, SortDirection direction);
+    ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults, String query, SortColumn sortColumn, SortDirection direction);
 
     /**
      * Returns the current status of a List FlowFile Request that was initiated via the {@link #listFlowFiles(String)}
@@ -269,6 +294,8 @@ public interface FlowFileQueue {
      * @param flowFileUuid the UUID of the FlowFile to retrieve
      * @return the FlowFile with the given UUID or <code>null</code> if no FlowFile can be found in this queue
      *         with the given UUID
+     *
+     * @throws IOException if unable to read FlowFiles that are stored on some external device
      */
-    FlowFileRecord getFlowFile(String flowFileUuid);
+    FlowFileRecord getFlowFile(String flowFileUuid) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
index 2959170..5781283 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
@@ -67,4 +67,9 @@ public interface ListFlowFileStatus {
      * @return a List of FlowFileSummary objects
      */
     List<FlowFileSummary> getFlowFileSummaries();
+
+    /**
+     * @return the percentage (an integer between 0 and 100, inclusive) of how close the request is to being completed
+     */
+    int getCompletionPercentage();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 5e9dd3c..035a888 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -321,6 +321,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
     public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents");
     public static final Pattern DROP_REQUEST_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/drop-requests/[a-f0-9\\-]{36}");
+    public static final Pattern LIST_FLOWFILES_URI = Pattern
+        .compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/listing-requests/[a-f0-9\\-]{36}");
 
     private final NiFiProperties properties;
     private final HttpRequestReplicator httpRequestReplicator;
@@ -2431,6 +2433,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         return "GET".equalsIgnoreCase(method) && PROVENANCE_EVENT_URI.matcher(uri.getPath()).matches();
     }
 
+    private static boolean isListFlowFilesEndpoint(final URI uri, final String method) {
+        return "GET".equalsIgnoreCase(method) && LIST_FLOWFILES_URI.matcher(uri.getPath()).matches();
+    }
+
     private static boolean isCountersEndpoint(final URI uri) {
         return COUNTERS_URI.matcher(uri.getPath()).matches();
     }
@@ -3309,6 +3315,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             mergeDropRequests(dropRequest, resultsMap);
 
             clientResponse = new NodeResponse(clientResponse, responseEntity);
+        } else if (hasSuccessfulClientResponse && isListFlowFilesEndpoint(uri, method)) {
+            // TODO: IMPLEMENT
         } else {
             if (!nodeResponsesToDrain.isEmpty()) {
                 drainResponses(nodeResponsesToDrain);

http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
index 03e0188..e83fd80 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
@@ -17,63 +17,93 @@
 
 package org.apache.nifi.controller.queue;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 public class ListFlowFileRequest implements ListFlowFileStatus {
+    private final String requestId;
+    private final QueueSize queueSize;
+    private final SortColumn sortColumn;
+    private final SortDirection sortDirection;
+    private final long submissionTime = System.currentTimeMillis();
+    private final List<FlowFileSummary> flowFileSummaries = new ArrayList<>();
+
     private ListFlowFileState state;
+    private String failureReason;
+    private int numSteps;
+    private int numCompletedSteps;
+    private long lastUpdated = System.currentTimeMillis();
+
+    public ListFlowFileRequest(final String requestId, final SortColumn sortColumn, final SortDirection sortDirection, final QueueSize queueSize, final int numSteps) {
+        this.requestId = requestId;
+        this.sortColumn = sortColumn;
+        this.sortDirection = sortDirection;
+        this.queueSize = queueSize;
+        this.numSteps = numSteps;
+    }
 
     @Override
     public String getRequestIdentifier() {
-        // TODO Auto-generated method stub
-        return null;
+        return requestId;
     }
 
     @Override
     public long getRequestSubmissionTime() {
-        // TODO Auto-generated method stub
-        return 0;
+        return submissionTime;
     }
 
     @Override
-    public long getLastUpdated() {
-        // TODO Auto-generated method stub
-        return 0;
+    public synchronized long getLastUpdated() {
+        return lastUpdated;
     }
 
     @Override
     public SortColumn getSortColumn() {
-        // TODO Auto-generated method stub
-        return null;
+        return sortColumn;
     }
 
     @Override
     public SortDirection getSortDirection() {
-        // TODO Auto-generated method stub
-        return null;
+        return sortDirection;
     }
 
     @Override
-    public ListFlowFileState getState() {
-        // TODO Auto-generated method stub
-        return null;
+    public synchronized ListFlowFileState getState() {
+        return state;
     }
 
     @Override
-    public String getFailureReason() {
-        // TODO Auto-generated method stub
-        return null;
+    public synchronized String getFailureReason() {
+        return failureReason;
+    }
+
+    public synchronized void setState(final ListFlowFileState state) {
+        this.state = state;
+        this.lastUpdated = System.currentTimeMillis();
+    }
+
+    public synchronized void setFailure(final String explanation) {
+        this.state = ListFlowFileState.FAILURE;
+        this.failureReason = explanation;
+        this.lastUpdated = System.currentTimeMillis();
     }
 
     @Override
-    public List<FlowFileSummary> getFlowFileSummaries() {
-        // TODO Auto-generated method stub
-        return null;
+    public synchronized List<FlowFileSummary> getFlowFileSummaries() {
+        return Collections.unmodifiableList(flowFileSummaries);
+    }
+
+    public synchronized void addFlowFileSummaries(final List<FlowFileSummary> summaries) {
+        // TODO: Implement.
+
+        lastUpdated = System.currentTimeMillis();
+        this.numCompletedSteps++;
     }
 
     @Override
     public QueueSize getQueueSize() {
-        // TODO Auto-generated method stub
-        return null;
+        return queueSize;
     }
 
     public synchronized boolean cancel() {
@@ -84,4 +114,9 @@ public class ListFlowFileRequest implements ListFlowFileStatus {
         this.state = ListFlowFileState.CANCELED;
         return true;
     }
+
+    @Override
+    public synchronized int getCompletionPercentage() {
+        return (int) (100F * numCompletedSteps / numSteps);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 2ad102f..dd3b687 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -177,6 +177,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.SSLContext;
+
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -3242,8 +3244,41 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     }
 
     public InputStream getContent(final FlowFileRecord flowFile, final String requestor, final String requestUri) throws IOException {
-        // TODO: IMPLEMENT
-        return null;
+        requireNonNull(flowFile);
+        requireNonNull(requestor);
+        requireNonNull(requestUri);
+
+        final InputStream stream;
+        final ResourceClaim resourceClaim;
+        final ContentClaim contentClaim = flowFile.getContentClaim();
+        if (contentClaim == null) {
+            resourceClaim = null;
+            stream = new ByteArrayInputStream(new byte[0]);
+        } else {
+            resourceClaim = flowFile.getContentClaim().getResourceClaim();
+            stream = contentRepository.read(flowFile.getContentClaim());
+        }
+
+        // Register a Provenance Event to indicate that we replayed the data.
+        final StandardProvenanceEventRecord.Builder sendEventBuilder = new StandardProvenanceEventRecord.Builder()
+            .setEventType(ProvenanceEventType.DOWNLOAD)
+            .setFlowFileUUID(flowFile.getAttribute(CoreAttributes.UUID.key()))
+            .setAttributes(flowFile.getAttributes(), Collections.<String, String> emptyMap())
+            .setTransitUri(requestUri)
+            .setEventTime(System.currentTimeMillis())
+            .setFlowFileEntryDate(flowFile.getEntryDate())
+            .setLineageStartDate(flowFile.getLineageStartDate())
+            .setComponentType(getName())
+            .setComponentId(getRootGroupId())
+            .setDetails("Download of Content requested by " + requestor + " for " + flowFile);
+
+        if (contentClaim != null) {
+            sendEventBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), contentClaim.getOffset(), flowFile.getSize());
+        }
+
+        final ProvenanceEventRecord sendEvent = sendEventBuilder.build();
+        provenanceEventRepository.registerEvent(sendEvent);
+        return stream;
     }
 
     private String getReplayFailureReason(final ProvenanceEventRecord event) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index cf0d185..9935ba4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -35,11 +35,16 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.nifi.attribute.expression.language.PreparedQuery;
+import org.apache.nifi.attribute.expression.language.Query;
+import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageParsingException;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.queue.DropFlowFileState;
 import org.apache.nifi.controller.queue.DropFlowFileStatus;
 import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileSummary;
 import org.apache.nifi.controller.queue.ListFlowFileRequest;
+import org.apache.nifi.controller.queue.ListFlowFileState;
 import org.apache.nifi.controller.queue.ListFlowFileStatus;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.queue.SortColumn;
@@ -53,8 +58,10 @@ import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.expression.AttributeExpression.ResultType;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
@@ -100,7 +107,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     private final int swapThreshold;
     private final FlowFileSwapManager swapManager;
     private final List<String> swapLocations = new ArrayList<>();
-    @SuppressWarnings("unused")
     private final TimedLock readLock;
     private final TimedLock writeLock;
     private final String identifier;
@@ -841,16 +847,194 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
 
     @Override
-    public ListFlowFileStatus listFlowFiles(final String requestIdentifier) {
-        return listFlowFiles(requestIdentifier, SortColumn.QUEUE_POSITION, SortDirection.ASCENDING);
+    public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults) {
+        return listFlowFiles(requestIdentifier, maxResults, SortColumn.QUEUE_POSITION, SortDirection.ASCENDING);
     }
 
     @Override
-    public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final SortColumn sortColumn, final SortDirection direction) {
-        // TODO: Implement
-        return null;
+    public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults, final SortColumn sortColumn, final SortDirection direction) {
+        return listFlowFiles(requestIdentifier, maxResults, null, sortColumn, direction);
+    }
+
+    @Override
+    public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults, final String query, final SortColumn sortColumn, final SortDirection direction) {
+        final PreparedQuery preparedQuery;
+        if (query == null) {
+            preparedQuery = null;
+        } else {
+            try {
+                final ResultType resultType = Query.compile(query).getResultType();
+                if (resultType != ResultType.BOOLEAN) {
+                    throw new IllegalArgumentException("Invalid expression Language provided to search the listing of FlowFiles. "
+                        + "The expression must return a 'Boolean' type but returns a " + resultType.name() + " type");
+                }
+                preparedQuery = Query.prepare(query);
+            } catch (final AttributeExpressionLanguageParsingException e) {
+                throw new IllegalArgumentException("Invalid Expression Language provided to search the listing of FlowFiles: " + query, e);
+            }
+        }
+
+        // purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother
+        if (listRequestMap.size() > 10) {
+            final List<String> toDrop = new ArrayList<>();
+            for (final Map.Entry<String, ListFlowFileRequest> entry : listRequestMap.entrySet()) {
+                final ListFlowFileRequest request = entry.getValue();
+                final boolean completed = request.getState() == ListFlowFileState.COMPLETE || request.getState() == ListFlowFileState.FAILURE;
+
+                if (completed && System.currentTimeMillis() - request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
+                    toDrop.add(entry.getKey());
+                }
+            }
+
+            for (final String requestId : toDrop) {
+                listRequestMap.remove(requestId);
+            }
+        }
+
+        final int numSteps;
+        readLock.lock();
+        try {
+            numSteps = 1 + swapLocations.size();
+        } finally {
+            readLock.unlock("listFlowFiles");
+        }
+
+        final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, sortColumn, direction, size(), numSteps);
+
+        final Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                int position = 0;
+                int resultCount = 0;
+                final List<FlowFileSummary> summaries = new ArrayList<>(activeQueue.size());
+
+                // we need a write lock while using the Active Queue because we can't iterate over it - we have to poll from it
+                // continually. This is because the iterator for PriorityQueue does not iterate over the elements in any particular
+                // order. Since we need the 'position' of the element in the queue, we need to iterate over them in the proper order.
+                writeLock.lock();
+                try {
+                    final List<FlowFileRecord> flowFileRecords = new ArrayList<>(activeQueue.size());
+
+                    FlowFileRecord flowFile;
+                    try {
+                        while ((flowFile = activeQueue.poll()) != null) {
+                            flowFileRecords.add(flowFile);
+                            position++;
+
+                            if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) {
+                                summaries.add(summarize(flowFile, position));
+                                if (++resultCount >= maxResults) {
+                                    break;
+                                }
+                            }
+                        }
+                    } finally {
+                        activeQueue.addAll(flowFileRecords);
+                    }
+                } finally {
+                    writeLock.unlock("List FlowFiles");
+                }
+
+                position = activeQueue.size();
+                sourceLoop: while (resultCount < maxResults) {
+                    try {
+                        // We are now iterating over swap files, and we don't need the write lock for this, just the read lock, since
+                        // we are not modifying anything.
+                        readLock.lock();
+                        try {
+                            for (final String location : swapLocations) {
+                                final List<FlowFileRecord> flowFiles = swapManager.peek(location, StandardFlowFileQueue.this);
+                                for (final FlowFileRecord flowFile : flowFiles) {
+                                    position++;
+
+                                    if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) {
+                                        summaries.add(summarize(flowFile, position));
+                                        if (++resultCount >= maxResults) {
+                                            break sourceLoop;
+                                        }
+                                    }
+                                }
+                            }
+
+                            for (final FlowFileRecord flowFile : swapQueue) {
+                                position++;
+
+                                if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) {
+                                    summaries.add(summarize(flowFile, position));
+                                    if (++resultCount >= maxResults) {
+                                        break sourceLoop;
+                                    }
+                                }
+                            }
+                        } finally {
+                            readLock.unlock("List FlowFiles");
+                        }
+                    } catch (final IOException ioe) {
+                        logger.error("Failed to read swapped FlowFiles in order to perform listing of queue " + StandardFlowFileQueue.this, ioe);
+                        listRequest.setFailure("Could not read FlowFiles from queue. Check log files for more details.");
+                    }
+                }
+            }
+        }, "List FlowFiles for Connection " + getIdentifier());
+        t.setDaemon(true);
+        t.start();
+
+        listRequestMap.put(requestIdentifier, listRequest);
+        return listRequest;
     }
 
+    private FlowFileSummary summarize(final FlowFile flowFile, final int position) {
+        // extract all of the information that we care about into new variables rather than just
+        // wrapping the FlowFile object with a FlowFileSummary object. We do this because we want to
+        // be able to hold many FlowFileSummary objects in memory and if we just wrap the FlowFile object,
+        // we will end up holding the entire FlowFile (including all Attributes) in the Java heap as well,
+        // which can be problematic if we expect them to be swapped out.
+        final String uuid = flowFile.getAttribute(CoreAttributes.UUID.key());
+        final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
+        final long size = flowFile.getSize();
+        final long lastQueuedTime = flowFile.getLastQueueDate();
+        final long lineageStart = flowFile.getLineageStartDate();
+        final boolean penalized = flowFile.isPenalized();
+
+        return new FlowFileSummary() {
+            @Override
+            public String getUuid() {
+                return uuid;
+            }
+
+            @Override
+            public String getFilename() {
+                return filename;
+            }
+
+            @Override
+            public int getPosition() {
+                return position;
+            }
+
+            @Override
+            public long getSize() {
+                return size;
+            }
+
+            @Override
+            public long lastQueuedTime() {
+                return lastQueuedTime;
+            }
+
+            @Override
+            public long getLineageStartDate() {
+                return lineageStart;
+            }
+
+            @Override
+            public boolean isPenalized() {
+                return penalized;
+            }
+        };
+    }
+
+
     @Override
     public ListFlowFileStatus getListFlowFileStatus(final String requestIdentifier) {
         return listRequestMap.get(requestIdentifier);
@@ -858,6 +1042,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
     @Override
     public ListFlowFileStatus cancelListFlowFileRequest(final String requestIdentifier) {
+        logger.info("Canceling ListFlowFile Request with ID {}", requestIdentifier);
         final ListFlowFileRequest request = listRequestMap.remove(requestIdentifier);
         if (request != null) {
             request.cancel();
@@ -867,8 +1052,41 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     }
 
     @Override
-    public FlowFileRecord getFlowFile(String flowFileUuid) {
-        // TODO: Implement
+    public FlowFileRecord getFlowFile(final String flowFileUuid) throws IOException {
+        if (flowFileUuid == null) {
+            return null;
+        }
+
+        readLock.lock();
+        try {
+            // read through all of the FlowFiles in the queue, looking for the FlowFile with the given ID
+            for (final FlowFileRecord flowFile : activeQueue) {
+                if (flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) {
+                    return flowFile;
+                }
+            }
+
+            for (final FlowFileRecord flowFile : swapQueue) {
+                if (flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) {
+                    return flowFile;
+                }
+            }
+
+            // TODO: consider using a Long flowFileId instead of a UUID, and then having the swap manager
+            // write out the min and max FlowFile ID's. This would allow us to then have a method: boolean isFlowFilePossiblyContained(long id)
+            // which can return a boolean value that can be used to determine whether or not to even call peek
+            for (final String swapLocation : swapLocations) {
+                final List<FlowFileRecord> flowFiles = swapManager.peek(swapLocation, this);
+                for (final FlowFileRecord flowFile : flowFiles) {
+                    if (flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) {
+                        return flowFile;
+                    }
+                }
+            }
+        } finally {
+            readLock.unlock("getFlowFile");
+        }
+
         return null;
     }
 


[2/5] nifi git commit: Merge branch 'NIFI-108' of https://git-wip-us.apache.org/repos/asf/nifi into NIFI-108

Posted by ma...@apache.org.
Merge branch 'NIFI-108' of https://git-wip-us.apache.org/repos/asf/nifi into NIFI-108


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/64a415cb
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/64a415cb
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/64a415cb

Branch: refs/heads/NIFI-108
Commit: 64a415cb443ad03b10c12a5246338848ccfd1f80
Parents: 245d8d4 e762d3c
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Dec 17 13:04:19 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Dec 17 13:04:19 2015 -0500

----------------------------------------------------------------------
 .../apache/nifi/web/api/dto/DropRequestDTO.java |  11 +-
 .../nifi/web/api/dto/FlowFileSummaryDTO.java    | 134 +++++++++
 .../nifi/web/api/dto/ListingRequestDTO.java     | 169 +++++++++++
 .../web/api/entity/ListingRequestEntity.java    |  44 +++
 .../org/apache/nifi/web/NiFiServiceFacade.java  |  31 ++
 .../nifi/web/StandardNiFiServiceFacade.java     |  18 +-
 .../apache/nifi/web/api/ConnectionResource.java | 301 ++++++++++++++++++-
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  46 +++
 .../org/apache/nifi/web/dao/ConnectionDAO.java  |  33 +-
 .../web/dao/impl/StandardConnectionDAO.java     |  36 ++-
 10 files changed, 807 insertions(+), 16 deletions(-)
----------------------------------------------------------------------



[3/5] nifi git commit: NIFI-108: Added merging of response for listing of flowfiles in cluster manager

Posted by ma...@apache.org.
NIFI-108: Added merging of response for listing of flowfiles in cluster manager


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9408507e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9408507e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9408507e

Branch: refs/heads/NIFI-108
Commit: 9408507eb5733714537afb6266e6fe886609afdc
Parents: 64a415c
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Dec 17 16:46:12 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Dec 17 16:46:12 2015 -0500

----------------------------------------------------------------------
 .../nifi/controller/queue/FlowFileSummary.java  |   2 +-
 .../controller/queue/ListFlowFileStatus.java    |  15 +++
 .../nifi/controller/queue/SortColumn.java       |  64 +++++++++--
 .../nifi/web/api/dto/ListingRequestDTO.java     |  76 ++++++++++++-
 .../cluster/manager/impl/WebClusterManager.java | 111 ++++++++++++++++---
 .../controller/queue/ListFlowFileRequest.java   |  34 ++++--
 .../nifi/controller/FlowFileSummaries.java      |  95 ++++++++++++++++
 .../nifi/controller/StandardFlowFileQueue.java  |  16 ++-
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  12 +-
 .../web/dao/impl/StandardConnectionDAO.java     |   2 +-
 10 files changed, 384 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java
index 8c37185..b7207f2 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java
@@ -45,7 +45,7 @@ public interface FlowFileSummary {
     /**
      * @return the timestamp (in milliseconds since epoch) at which the FlowFile was added to the queue
      */
-    long lastQueuedTime();
+    long getLastQueuedTime();
 
     /**
      * @return the timestamp (in milliseconds since epoch) at which the FlowFile's greatest ancestor entered the flow

http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
index 5781283..cae500d 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
@@ -22,6 +22,11 @@ import java.util.List;
 public interface ListFlowFileStatus {
 
     /**
+     * @return the maximum number of FlowFile Summary objects that should be returned
+     */
+    int getMaxResults();
+
+    /**
      * @return the identifier of the request to drop FlowFiles from the queue
      */
     String getRequestIdentifier();
@@ -72,4 +77,14 @@ public interface ListFlowFileStatus {
      * @return the percentage (an integer between 0 and 100, inclusive) of how close the request is to being completed
      */
     int getCompletionPercentage();
+
+    /**
+     * @return the total number of steps that are required in order to finish the listing
+     */
+    int getTotalStepCount();
+
+    /**
+     * @return the total number of steps that have already been completed. The value returned will be >= 0 and <= the result of calling {@link #getTotalStepCount()}.
+     */
+    int getCompletedStepCount();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java
index 48f3e9f..30d285c 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java
@@ -17,44 +17,92 @@
 
 package org.apache.nifi.controller.queue;
 
+import java.util.Comparator;
+
 /**
  * Specifies which column to sort on when performing a Listing of FlowFiles via
  * {@link FlowFileQueue#listFlowFiles(String, SortColumn, SortDirection)}
  */
-public enum SortColumn {
+public enum SortColumn implements Comparator<FlowFileSummary> {
     /**
      * Sort based on the current position in the queue
      */
-    QUEUE_POSITION,
+    QUEUE_POSITION (new Comparator<FlowFileSummary>() {
+        @Override
+        public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+            return Integer.compare(o1.getPosition(), o2.getPosition());
+        }
+    }),
 
     /**
      * Sort based on the UUID of the FlowFile
      */
-    FLOWFILE_UUID,
+    FLOWFILE_UUID (new Comparator<FlowFileSummary>() {
+        @Override
+        public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+            return o1.getUuid().compareTo(o2.getUuid());
+        }
+    }),
 
     /**
      * Sort based on the 'filename' attribute of the FlowFile
      */
-    FILENAME,
+    FILENAME (new Comparator<FlowFileSummary>() {
+        @Override
+        public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+            return o1.getFilename().compareTo(o2.getFilename());
+        }
+    }),
 
     /**
      * Sort based on the size of the FlowFile
      */
-    FLOWFILE_SIZE,
+    FLOWFILE_SIZE(new Comparator<FlowFileSummary>() {
+        @Override
+        public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+            return Long.compare(o1.getSize(), o2.getSize());
+        }
+    }),
 
     /**
      * Sort based on how long the FlowFile has been sitting in the queue
      */
-    QUEUED_DURATION,
+    QUEUED_DURATION (new Comparator<FlowFileSummary>() {
+        @Override
+        public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+            return -Long.compare(o1.getLastQueuedTime(), o2.getLastQueuedTime());
+        }
+    }),
 
     /**
      * Sort based on the age of the FlowFile. I.e., the time at which the FlowFile's
      * "greatest ancestor" entered the flow
      */
-    FLOWFILE_AGE,
+    FLOWFILE_AGE (new Comparator<FlowFileSummary>() {
+        @Override
+        public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+            return Long.compare(o1.getLineageStartDate(), o2.getLineageStartDate());
+        }
+    }),
 
     /**
      * Sort based on when the FlowFile's penalization ends
      */
-    PENALIZATION;
+    PENALIZATION (new Comparator<FlowFileSummary>() {
+        @Override
+        public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+            return Boolean.compare(o1.isPenalized(), o2.isPenalized());
+        }
+    });
+
+    private final Comparator<FlowFileSummary> comparator;
+
+    private SortColumn(final Comparator<FlowFileSummary> comparator) {
+        this.comparator = comparator;
+    }
+
+    @Override
+    public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+        return comparator.compare(o1, o2);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java
index 53c2a74..36c0518 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java
@@ -16,13 +16,15 @@
  */
 package org.apache.nifi.web.api.dto;
 
-import com.wordnik.swagger.annotations.ApiModelProperty;
-import org.apache.nifi.web.api.dto.util.TimestampAdapter;
-
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
 import java.util.Date;
 import java.util.List;
 
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.nifi.web.api.dto.util.TimestampAdapter;
+
+import com.wordnik.swagger.annotations.ApiModelProperty;
+
 public class ListingRequestDTO {
 
     private String id;
@@ -34,6 +36,11 @@ public class ListingRequestDTO {
     private Integer percentCompleted;
     private Boolean finished;
     private String failureReason;
+    private String sortColumn;
+    private String sortDirection;
+    private Integer maxResults;
+    private Integer totalStepCount;
+    private Integer completedStepCount;
 
     private String state;
 
@@ -166,4 +173,65 @@ public class ListingRequestDTO {
     public void setFlowFileSummaries(List<FlowFileSummaryDTO> flowFileSummaries) {
         this.flowFileSummaries = flowFileSummaries;
     }
+
+    /**
+     * @return the column on which the listing is sorted
+     */
+    @ApiModelProperty(value = "The column on which the FlowFiles are sorted.")
+    public String getSortColumn() {
+        return sortColumn;
+    }
+
+    public void setSortColumn(String sortColumn) {
+        this.sortColumn = sortColumn;
+    }
+
+    /**
+     * @return the direction in which the FlowFiles are sorted
+     */
+    @ApiModelProperty(value = "The direction in which the FlowFiles are sorted. Either ASCENDING or DESCENDING.")
+    public String getSortDirection() {
+        return sortDirection;
+    }
+
+    public void setSortDirection(String sortDirection) {
+        this.sortDirection = sortDirection;
+    }
+
+    /**
+     * @return the maximum number of FlowFileSummary objects to return
+     */
+    @ApiModelProperty(value = "The maximum number of FlowFileSummary objects to return")
+    public Integer getMaxResults() {
+        return maxResults;
+    }
+
+    public void setMaxResults(Integer maxResults) {
+        this.maxResults = maxResults;
+    }
+
+
+    /**
+     * @return the total number of steps required to complete the listing
+     */
+    @ApiModelProperty(value = "The total number of steps required to complete the listing")
+    public Integer getTotalStepCount() {
+        return totalStepCount;
+    }
+
+    public void setTotalStepCount(Integer totalStepCount) {
+        this.totalStepCount = totalStepCount;
+    }
+
+    /**
+     * @return the number of steps that have already been completed. This value will be >= 0 and <= the total step count
+     */
+    @ApiModelProperty(value = "The number of steps that have already been completed. This value will be between 0 and the total step count (inclusive)")
+    public Integer getCompletedStepCount() {
+        return completedStepCount;
+    }
+
+    public void setCompletedStepCount(Integer completedStepCount) {
+        this.completedStepCount = completedStepCount;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 035a888..6b43f9d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -34,11 +34,13 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Queue;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
@@ -125,6 +127,7 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.FlowFileSummaries;
 import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
@@ -132,6 +135,10 @@ import org.apache.nifi.controller.StandardFlowSerializer;
 import org.apache.nifi.controller.StandardProcessorNode;
 import org.apache.nifi.controller.ValidationContextFactory;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
+import org.apache.nifi.controller.queue.DropFlowFileState;
+import org.apache.nifi.controller.queue.ListFlowFileState;
+import org.apache.nifi.controller.queue.SortColumn;
+import org.apache.nifi.controller.queue.SortDirection;
 import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode;
 import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
 import org.apache.nifi.controller.reporting.ReportingTaskProvider;
@@ -142,6 +149,7 @@ import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
 import org.apache.nifi.controller.service.ControllerServiceLoader;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
@@ -192,13 +200,19 @@ import org.apache.nifi.util.ReflectionUtils;
 import org.apache.nifi.web.OptimisticLockingManager;
 import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.UpdateRevision;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.dto.DropRequestDTO;
+import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+import org.apache.nifi.web.api.dto.ListingRequestDTO;
 import org.apache.nifi.web.api.dto.NodeDTO;
 import org.apache.nifi.web.api.dto.ProcessGroupDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
+import org.apache.nifi.web.api.dto.ReportingTaskDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
@@ -207,7 +221,12 @@ import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
 import org.apache.nifi.web.api.dto.status.NodeStatusHistoryDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
+import org.apache.nifi.web.api.entity.ControllerServicesEntity;
+import org.apache.nifi.web.api.entity.DropRequestEntity;
 import org.apache.nifi.web.api.entity.FlowSnippetEntity;
+import org.apache.nifi.web.api.entity.ListingRequestEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
 import org.apache.nifi.web.api.entity.ProcessorsEntity;
@@ -215,6 +234,8 @@ import org.apache.nifi.web.api.entity.ProvenanceEntity;
 import org.apache.nifi.web.api.entity.ProvenanceEventEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
+import org.apache.nifi.web.api.entity.ReportingTaskEntity;
+import org.apache.nifi.web.api.entity.ReportingTasksEntity;
 import org.apache.nifi.web.util.WebUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -226,19 +247,6 @@ import org.xml.sax.SAXException;
 import org.xml.sax.SAXParseException;
 
 import com.sun.jersey.api.client.ClientResponse;
-import org.apache.nifi.controller.queue.DropFlowFileState;
-
-import org.apache.nifi.controller.service.ControllerServiceState;
-import org.apache.nifi.web.api.dto.ControllerServiceDTO;
-import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
-import org.apache.nifi.web.api.dto.DropRequestDTO;
-import org.apache.nifi.web.api.dto.ReportingTaskDTO;
-import org.apache.nifi.web.api.entity.ControllerServiceEntity;
-import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
-import org.apache.nifi.web.api.entity.ControllerServicesEntity;
-import org.apache.nifi.web.api.entity.DropRequestEntity;
-import org.apache.nifi.web.api.entity.ReportingTaskEntity;
-import org.apache.nifi.web.api.entity.ReportingTasksEntity;
 
 /**
  * Provides a cluster manager implementation. The manager federates incoming HTTP client requests to the nodes' external API using the HTTP protocol. The manager also communicates with nodes using the
@@ -319,6 +327,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node";
     public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
 
+    @Deprecated
     public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents");
     public static final Pattern DROP_REQUEST_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/drop-requests/[a-f0-9\\-]{36}");
     public static final Pattern LIST_FLOWFILES_URI = Pattern
@@ -2831,6 +2840,64 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         return normalizedValidationErrors;
     }
 
+
+    /**
+     * Merges the listing requests in the specified map into the specified listing request
+     *
+     * @param listingRequest the target listing request
+     * @param listingRequestMap the mapping of all responses being merged
+     */
+    private void mergeListingRequests(final ListingRequestDTO listingRequest, final Map<NodeIdentifier, ListingRequestDTO> listingRequestMap) {
+        final Comparator<FlowFileSummaryDTO> comparator = FlowFileSummaries.createDTOComparator(
+            SortColumn.valueOf(listingRequest.getSortColumn()), SortDirection.valueOf(listingRequest.getSortDirection()));
+
+        final NavigableSet<FlowFileSummaryDTO> flowFileSummaries = new TreeSet<>(comparator);
+
+        ListFlowFileState state = null;
+        int sumOfPercents = 0;
+        boolean finished = true;
+        for (final ListingRequestDTO nodeRequest : listingRequestMap.values()) {
+            Integer percentComplete = nodeRequest.getPercentCompleted();
+            if (percentComplete != null) {
+                sumOfPercents += percentComplete;
+            }
+
+            if (!nodeRequest.getFinished()) {
+                finished = false;
+            }
+
+            if (nodeRequest.getLastUpdated().after(listingRequest.getLastUpdated())) {
+                listingRequest.setLastUpdated(nodeRequest.getLastUpdated());
+            }
+
+            // Keep the state with the lowest ordinal value (the "least completed").
+            final ListFlowFileState nodeState = ListFlowFileState.valueOfDescription(nodeRequest.getState());
+            if (state == null || state.compareTo(nodeState) > 0) {
+                state = nodeState;
+            }
+
+            for (final FlowFileSummaryDTO summaryDTO : nodeRequest.getFlowFileSummaries()) {
+                flowFileSummaries.add(summaryDTO);
+
+                // Keep the set from growing beyond our max
+                if (flowFileSummaries.size() > listingRequest.getMaxResults()) {
+                    flowFileSummaries.pollLast();
+                }
+            }
+
+            if (nodeRequest.getFailureReason() != null) {
+                listingRequest.setFailureReason(nodeRequest.getFailureReason());
+            }
+        }
+
+        final List<FlowFileSummaryDTO> summaryDTOs = new ArrayList<>(flowFileSummaries);
+        listingRequest.setFlowFileSummaries(summaryDTOs);
+
+        final int percentCompleted = sumOfPercents / listingRequestMap.size();
+        listingRequest.setPercentCompleted(percentCompleted);
+        listingRequest.setFinished(finished);
+    }
+
     /**
      * Merges the drop requests in the specified map into the specified drop request.
      *
@@ -3316,7 +3383,23 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
             clientResponse = new NodeResponse(clientResponse, responseEntity);
         } else if (hasSuccessfulClientResponse && isListFlowFilesEndpoint(uri, method)) {
-            // TODO: IMPLEMENT
+            final ListingRequestEntity responseEntity = clientResponse.getClientResponse().getEntity(ListingRequestEntity.class);
+            final ListingRequestDTO listingRequest = responseEntity.getListingRequest();
+
+            final Map<NodeIdentifier, ListingRequestDTO> resultsMap = new HashMap<>();
+            for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+                if (problematicNodeResponses.contains(nodeResponse)) {
+                    continue;
+                }
+
+                final ListingRequestEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ListingRequestEntity.class);
+                final ListingRequestDTO nodeListingRequest = nodeResponseEntity.getListingRequest();
+
+                resultsMap.put(nodeResponse.getNodeId(), nodeListingRequest);
+            }
+            mergeListingRequests(listingRequest, resultsMap);
+
+            clientResponse = new NodeResponse(clientResponse, responseEntity);
         } else {
             if (!nodeResponsesToDrain.isEmpty()) {
                 drainResponses(nodeResponsesToDrain);

http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
index e83fd80..aad4c4f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 public class ListFlowFileRequest implements ListFlowFileStatus {
     private final String requestId;
+    private final int maxResults;
     private final QueueSize queueSize;
     private final SortColumn sortColumn;
     private final SortDirection sortDirection;
@@ -32,13 +33,14 @@ public class ListFlowFileRequest implements ListFlowFileStatus {
     private ListFlowFileState state;
     private String failureReason;
     private int numSteps;
-    private int numCompletedSteps;
+    private int completedStepCount;
     private long lastUpdated = System.currentTimeMillis();
 
-    public ListFlowFileRequest(final String requestId, final SortColumn sortColumn, final SortDirection sortDirection, final QueueSize queueSize, final int numSteps) {
+    public ListFlowFileRequest(final String requestId, final SortColumn sortColumn, final SortDirection sortDirection, final int maxResults, final QueueSize queueSize, final int numSteps) {
         this.requestId = requestId;
         this.sortColumn = sortColumn;
         this.sortDirection = sortDirection;
+        this.maxResults = maxResults;
         this.queueSize = queueSize;
         this.numSteps = numSteps;
     }
@@ -94,11 +96,10 @@ public class ListFlowFileRequest implements ListFlowFileStatus {
         return Collections.unmodifiableList(flowFileSummaries);
     }
 
-    public synchronized void addFlowFileSummaries(final List<FlowFileSummary> summaries) {
-        // TODO: Implement.
-
+    public synchronized void setFlowFileSummaries(final List<FlowFileSummary> summaries) {
+        this.flowFileSummaries.clear();
+        this.flowFileSummaries.addAll(summaries);
         lastUpdated = System.currentTimeMillis();
-        this.numCompletedSteps++;
     }
 
     @Override
@@ -117,6 +118,25 @@ public class ListFlowFileRequest implements ListFlowFileStatus {
 
     @Override
     public synchronized int getCompletionPercentage() {
-        return (int) (100F * numCompletedSteps / numSteps);
+        return (int) (100F * completedStepCount / numSteps);
+    }
+
+    public synchronized void setCompletedStepCount(final int completedStepCount) {
+        this.completedStepCount = completedStepCount;
+    }
+
+    @Override
+    public int getMaxResults() {
+        return maxResults;
+    }
+
+    @Override
+    public int getTotalStepCount() {
+        return numSteps;
+    }
+
+    @Override
+    public int getCompletedStepCount() {
+        return completedStepCount;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java
new file mode 100644
index 0000000..5a0a3ab
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller;
+
+import java.util.Collections;
+import java.util.Comparator;
+
+import org.apache.nifi.controller.queue.FlowFileSummary;
+import org.apache.nifi.controller.queue.SortColumn;
+import org.apache.nifi.controller.queue.SortDirection;
+import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
+
+public class FlowFileSummaries {
+
+    public static Comparator<FlowFileSummary> createComparator(final SortColumn column, final SortDirection direction) {
+        final Comparator<FlowFileSummary> comparator = new Comparator<FlowFileSummary>() {
+            @Override
+            public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+                switch (column) {
+                    case FILENAME:
+                        return o1.getFilename().compareTo(o2.getFilename());
+                    case FLOWFILE_AGE:
+                        return Long.compare(o1.getLineageStartDate(), o2.getLineageStartDate());
+                    case FLOWFILE_SIZE:
+                        return Long.compare(o1.getSize(), o2.getSize());
+                    case FLOWFILE_UUID:
+                        return o1.getUuid().compareTo(o2.getUuid());
+                    case PENALIZATION:
+                        return Boolean.compare(o1.isPenalized(), o2.isPenalized());
+                    case QUEUE_POSITION:
+                        return Long.compare(o1.getPosition(), o2.getPosition());
+                    case QUEUED_DURATION:
+                        return Long.compare(o1.getLastQueuedTime(), o2.getLastQueuedTime());
+                }
+
+                return 0;
+            }
+        };
+
+
+        if (direction == SortDirection.DESCENDING) {
+            return Collections.reverseOrder(comparator);
+        } else {
+            return comparator;
+        }
+    }
+
+    public static Comparator<FlowFileSummaryDTO> createDTOComparator(final SortColumn column, final SortDirection direction) {
+        final Comparator<FlowFileSummaryDTO> comparator = new Comparator<FlowFileSummaryDTO>() {
+            @Override
+            public int compare(final FlowFileSummaryDTO o1, final FlowFileSummaryDTO o2) {
+                switch (column) {
+                    case FILENAME:
+                        return o1.getFilename().compareTo(o2.getFilename());
+                    case FLOWFILE_AGE:
+                        return o1.getLinageStartDate().compareTo(o2.getLinageStartDate());
+                    case FLOWFILE_SIZE:
+                        return Long.compare(o1.getSize(), o2.getSize());
+                    case FLOWFILE_UUID:
+                        return o1.getUuid().compareTo(o2.getUuid());
+                    case PENALIZATION:
+                        return Boolean.compare(o1.getPenalized(), o2.getPenalized());
+                    case QUEUE_POSITION:
+                        return Long.compare(o1.getPosition(), o2.getPosition());
+                    case QUEUED_DURATION:
+                        return o1.getLastQueuedTime().compareTo(o2.getLastQueuedTime());
+                }
+
+                return 0;
+            }
+        };
+
+        if (direction == SortDirection.DESCENDING) {
+            return Collections.reverseOrder(comparator);
+        } else {
+            return comparator;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 9935ba4..daaa763 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -894,12 +894,13 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         final int numSteps;
         readLock.lock();
         try {
-            numSteps = 1 + swapLocations.size();
+            // numSteps = 1 for each swap location + 1 for active queue + 1 for swap queue.
+            numSteps = 2 + swapLocations.size();
         } finally {
             readLock.unlock("listFlowFiles");
         }
 
-        final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, sortColumn, direction, size(), numSteps);
+        final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, sortColumn, direction, maxResults, size(), numSteps);
 
         final Thread t = new Thread(new Runnable() {
             @Override
@@ -907,6 +908,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                 int position = 0;
                 int resultCount = 0;
                 final List<FlowFileSummary> summaries = new ArrayList<>(activeQueue.size());
+                int completedStepCount = 0;
 
                 // we need a write lock while using the Active Queue because we can't iterate over it - we have to poll from it
                 // continually. This is because the iterator for PriorityQueue does not iterate over the elements in any particular
@@ -935,6 +937,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                     writeLock.unlock("List FlowFiles");
                 }
 
+                listRequest.setCompletedStepCount(++completedStepCount);
+
                 position = activeQueue.size();
                 sourceLoop: while (resultCount < maxResults) {
                     try {
@@ -954,6 +958,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                                         }
                                     }
                                 }
+
+                                listRequest.setCompletedStepCount(++completedStepCount);
                             }
 
                             for (final FlowFileRecord flowFile : swapQueue) {
@@ -966,6 +972,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                                     }
                                 }
                             }
+
+                            listRequest.setCompletedStepCount(++completedStepCount);
                         } finally {
                             readLock.unlock("List FlowFiles");
                         }
@@ -974,6 +982,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                         listRequest.setFailure("Could not read FlowFiles from queue. Check log files for more details.");
                     }
                 }
+
+                listRequest.setFlowFileSummaries(summaries);
             }
         }, "List FlowFiles for Connection " + getIdentifier());
         t.setDaemon(true);
@@ -1018,7 +1028,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
             }
 
             @Override
-            public long lastQueuedTime() {
+            public long getLastQueuedTime() {
                 return lastQueuedTime;
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index b93ff95..0805b54 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -360,10 +360,14 @@ public final class DtoFactory {
         dto.setState(listingRequest.getState().toString());
         dto.setFailureReason(listingRequest.getFailureReason());
         dto.setFinished(isListingRequestComplete(listingRequest.getState()));
+        dto.setMaxResults(listingRequest.getMaxResults());
+        dto.setSortColumn(listingRequest.getSortColumn().name());
+        dto.setSortDirection(listingRequest.getSortDirection().name());
+        dto.setTotalStepCount(listingRequest.getTotalStepCount());
+        dto.setCompletedStepCount(listingRequest.getCompletedStepCount());
+        dto.setPercentCompleted(listingRequest.getCompletionPercentage());
 
         if (isListingRequestComplete(listingRequest.getState())) {
-            dto.setPercentCompleted(100);
-
             final List<FlowFileSummary> flowFileSummaries = listingRequest.getFlowFileSummaries();
             if (flowFileSummaries != null) {
                 final List<FlowFileSummaryDTO> summaryDtos = new ArrayList<>(flowFileSummaries.size());
@@ -372,8 +376,6 @@ public final class DtoFactory {
                 }
                 dto.setFlowFileSummaries(summaryDtos);
             }
-        } else {
-            dto.setPercentCompleted(50);
         }
 
         return dto;
@@ -383,7 +385,7 @@ public final class DtoFactory {
         final FlowFileSummaryDTO dto = new FlowFileSummaryDTO();
         dto.setUuid(summary.getUuid());
         dto.setFilename(summary.getFilename());
-        dto.setLastQueuedTime(new Date(summary.lastQueuedTime()));
+        dto.setLastQueuedTime(new Date(summary.getLastQueuedTime()));
         dto.setLinageStartDate(new Date(summary.getLineageStartDate()));
         dto.setPenalized(summary.isPenalized());
         dto.setPosition(summary.getPosition());

http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
index 0e9a90a..459c2b5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
@@ -342,7 +342,7 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
     public ListFlowFileStatus createFlowFileListingRequest(String groupId, String id, String listingRequestId) {
         final Connection connection = locateConnection(groupId, id);
         final FlowFileQueue queue = connection.getFlowFileQueue();
-        return queue.listFlowFiles(listingRequestId);
+        return queue.listFlowFiles(listingRequestId, 100);
     }
 
     @Override


[4/5] nifi git commit: Merge branch 'NIFI-108' of https://git-wip-us.apache.org/repos/asf/nifi into NIFI-108

Posted by ma...@apache.org.
Merge branch 'NIFI-108' of https://git-wip-us.apache.org/repos/asf/nifi into NIFI-108


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f4ef8847
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f4ef8847
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f4ef8847

Branch: refs/heads/NIFI-108
Commit: f4ef8847571889aa4627654a912bdbb7308865bf
Parents: 9408507 6707337
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Dec 17 16:46:14 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Dec 17 16:46:14 2015 -0500

----------------------------------------------------------------------
 .../apache/nifi/web/api/dto/FlowFileDTO.java    |  40 +++
 .../nifi/web/api/dto/FlowFileSummaryDTO.java    |  32 ++
 .../nifi/web/api/entity/FlowFileEntity.java     |  44 +++
 .../org/apache/nifi/web/NiFiServiceFacade.java  |  22 ++
 .../nifi/web/StandardNiFiServiceFacade.java     |  11 +
 .../apache/nifi/web/api/ConnectionResource.java | 333 ++++++++++++++++---
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  14 +
 .../org/apache/nifi/web/dao/ConnectionDAO.java  |  25 +-
 .../web/dao/impl/StandardConnectionDAO.java     |  79 +++++
 9 files changed, 556 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f4ef8847/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/nifi/blob/f4ef8847/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
----------------------------------------------------------------------


[5/5] nifi git commit: NIFI-108: Add clusterNodeId to FlowFileSummaryDTO

Posted by ma...@apache.org.
NIFI-108: Add clusterNodeId to FlowFileSummaryDTO


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b12aba78
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b12aba78
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b12aba78

Branch: refs/heads/NIFI-108
Commit: b12aba7829e91b228568e5a3aa75558c3c227247
Parents: f4ef884
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Dec 17 16:54:13 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Dec 17 16:54:13 2015 -0500

----------------------------------------------------------------------
 .../org/apache/nifi/cluster/manager/impl/WebClusterManager.java  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b12aba78/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 6b43f9d..210cf52 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -2856,7 +2856,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         ListFlowFileState state = null;
         int sumOfPercents = 0;
         boolean finished = true;
-        for (final ListingRequestDTO nodeRequest : listingRequestMap.values()) {
+        for (final Map.Entry<NodeIdentifier, ListingRequestDTO> entry : listingRequestMap.entrySet()) {
+            final ListingRequestDTO nodeRequest = entry.getValue();
             Integer percentComplete = nodeRequest.getPercentCompleted();
             if (percentComplete != null) {
                 sumOfPercents += percentComplete;
@@ -2877,6 +2878,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             }
 
             for (final FlowFileSummaryDTO summaryDTO : nodeRequest.getFlowFileSummaries()) {
+                summaryDTO.setClusterNodeId(entry.getKey().getId());
                 flowFileSummaries.add(summaryDTO);
 
                 // Keep the set from growing beyond our max