You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/12/17 22:54:20 UTC
[1/5] nifi git commit: NIFI-108: Implementing ability to list
FlowFiles in a queue
Repository: nifi
Updated Branches:
refs/heads/NIFI-108 670733753 -> b12aba782
NIFI-108: Implementing ability to list FlowFiles in a queue
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/245d8d4d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/245d8d4d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/245d8d4d
Branch: refs/heads/NIFI-108
Commit: 245d8d4d2d3fbd6182e7e587d3433cc7378cc37a
Parents: 2e22954
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Dec 17 13:00:30 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Dec 17 13:00:30 2015 -0500
----------------------------------------------------------------------
.../nifi/controller/queue/FlowFileQueue.java | 33 ++-
.../controller/queue/ListFlowFileStatus.java | 5 +
.../cluster/manager/impl/WebClusterManager.java | 8 +
.../controller/queue/ListFlowFileRequest.java | 79 +++++--
.../apache/nifi/controller/FlowController.java | 39 +++-
.../nifi/controller/StandardFlowFileQueue.java | 234 ++++++++++++++++++-
6 files changed, 363 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
index fe8649d..dbf2f04 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.controller.queue;
+import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
@@ -217,12 +218,32 @@ public interface FlowFileQueue {
* will be returned ordered by the position of the FlowFile in the queue.
*
* @param requestIdentifier the identifier of the List FlowFile Request
+ * @param maxResults the maximum number of FlowFileSummary objects to add to the ListFlowFileStatus
+ *
+ * @return the status for the request
+ *
+ * @throws IllegalStateException if either the source or the destination of the connection to which this queue belongs
+ * is currently running.
+ */
+ ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults);
+
+ /**
+ * Initiates a request to obtain a listing of FlowFiles in this queue. This method returns a
+ * ListFlowFileStatus that can be used to obtain information about the FlowFiles that exist
+ * within the queue. Additionally, the ListFlowFileStatus provides a request identifier that
+ * can then be passed to the {@link #getListFlowFileStatus(String)}
+ *
+ * @param requestIdentifier the identifier of the List FlowFile Request
+ * @param maxResults the maximum number of FlowFileSummary objects to add to the ListFlowFileStatus
+ * @param sortColumn specifies which column to sort on
+ * @param direction specifies which direction to sort the FlowFiles
+ *
* @return the status for the request
*
* @throws IllegalStateException if either the source or the destination of the connection to which this queue belongs
* is currently running.
*/
- ListFlowFileStatus listFlowFiles(String requestIdentifier);
+ ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults, SortColumn sortColumn, SortDirection direction);
/**
* Initiates a request to obtain a listing of FlowFiles in this queue. This method returns a
@@ -231,6 +252,9 @@ public interface FlowFileQueue {
* can then be passed to the {@link #getListFlowFileStatus(String)}
*
* @param requestIdentifier the identifier of the List FlowFile Request
+ * @param maxResults the maximum number of FlowFileSummary objects to add to the ListFlowFileStatus
+ * @param query an Expression Language expression that will be evaluated against all FlowFiles. Only FlowFiles that satisfy the expression will
+ * be included in the results. The expression must be a valid expression and return a Boolean type
* @param sortColumn specifies which column to sort on
* @param direction specifies which direction to sort the FlowFiles
*
@@ -238,8 +262,9 @@ public interface FlowFileQueue {
*
* @throws IllegalStateException if either the source or the destination of the connection to which this queue belongs
* is currently running.
+ * @throws IllegalArgumentException if query is not a valid Expression Language expression or does not return a boolean type
*/
- ListFlowFileStatus listFlowFiles(String requestIdentifier, SortColumn sortColumn, SortDirection direction);
+ ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults, String query, SortColumn sortColumn, SortDirection direction);
/**
* Returns the current status of a List FlowFile Request that was initiated via the {@link #listFlowFiles(String)}
@@ -269,6 +294,8 @@ public interface FlowFileQueue {
* @param flowFileUuid the UUID of the FlowFile to retrieve
* @return the FlowFile with the given UUID or <code>null</code> if no FlowFile can be found in this queue
* with the given UUID
+ *
+ * @throws IOException if unable to read FlowFiles that are stored on some external device
*/
- FlowFileRecord getFlowFile(String flowFileUuid);
+ FlowFileRecord getFlowFile(String flowFileUuid) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
index 2959170..5781283 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
@@ -67,4 +67,9 @@ public interface ListFlowFileStatus {
* @return a List of FlowFileSummary objects
*/
List<FlowFileSummary> getFlowFileSummaries();
+
+ /**
+ * @return the percentage (an integer between 0 and 100, inclusive) of how close the request is to being completed
+ */
+ int getCompletionPercentage();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 5e9dd3c..035a888 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -321,6 +321,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents");
public static final Pattern DROP_REQUEST_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/drop-requests/[a-f0-9\\-]{36}");
+ public static final Pattern LIST_FLOWFILES_URI = Pattern
+ .compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/listing-requests/[a-f0-9\\-]{36}");
private final NiFiProperties properties;
private final HttpRequestReplicator httpRequestReplicator;
@@ -2431,6 +2433,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return "GET".equalsIgnoreCase(method) && PROVENANCE_EVENT_URI.matcher(uri.getPath()).matches();
}
+ private static boolean isListFlowFilesEndpoint(final URI uri, final String method) {
+ return "GET".equalsIgnoreCase(method) && LIST_FLOWFILES_URI.matcher(uri.getPath()).matches();
+ }
+
private static boolean isCountersEndpoint(final URI uri) {
return COUNTERS_URI.matcher(uri.getPath()).matches();
}
@@ -3309,6 +3315,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
mergeDropRequests(dropRequest, resultsMap);
clientResponse = new NodeResponse(clientResponse, responseEntity);
+ } else if (hasSuccessfulClientResponse && isListFlowFilesEndpoint(uri, method)) {
+ // TODO: IMPLEMENT
} else {
if (!nodeResponsesToDrain.isEmpty()) {
drainResponses(nodeResponsesToDrain);
http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
index 03e0188..e83fd80 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
@@ -17,63 +17,93 @@
package org.apache.nifi.controller.queue;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
public class ListFlowFileRequest implements ListFlowFileStatus {
+ private final String requestId;
+ private final QueueSize queueSize;
+ private final SortColumn sortColumn;
+ private final SortDirection sortDirection;
+ private final long submissionTime = System.currentTimeMillis();
+ private final List<FlowFileSummary> flowFileSummaries = new ArrayList<>();
+
private ListFlowFileState state;
+ private String failureReason;
+ private int numSteps;
+ private int numCompletedSteps;
+ private long lastUpdated = System.currentTimeMillis();
+
+ public ListFlowFileRequest(final String requestId, final SortColumn sortColumn, final SortDirection sortDirection, final QueueSize queueSize, final int numSteps) {
+ this.requestId = requestId;
+ this.sortColumn = sortColumn;
+ this.sortDirection = sortDirection;
+ this.queueSize = queueSize;
+ this.numSteps = numSteps;
+ }
@Override
public String getRequestIdentifier() {
- // TODO Auto-generated method stub
- return null;
+ return requestId;
}
@Override
public long getRequestSubmissionTime() {
- // TODO Auto-generated method stub
- return 0;
+ return submissionTime;
}
@Override
- public long getLastUpdated() {
- // TODO Auto-generated method stub
- return 0;
+ public synchronized long getLastUpdated() {
+ return lastUpdated;
}
@Override
public SortColumn getSortColumn() {
- // TODO Auto-generated method stub
- return null;
+ return sortColumn;
}
@Override
public SortDirection getSortDirection() {
- // TODO Auto-generated method stub
- return null;
+ return sortDirection;
}
@Override
- public ListFlowFileState getState() {
- // TODO Auto-generated method stub
- return null;
+ public synchronized ListFlowFileState getState() {
+ return state;
}
@Override
- public String getFailureReason() {
- // TODO Auto-generated method stub
- return null;
+ public synchronized String getFailureReason() {
+ return failureReason;
+ }
+
+ public synchronized void setState(final ListFlowFileState state) {
+ this.state = state;
+ this.lastUpdated = System.currentTimeMillis();
+ }
+
+ public synchronized void setFailure(final String explanation) {
+ this.state = ListFlowFileState.FAILURE;
+ this.failureReason = explanation;
+ this.lastUpdated = System.currentTimeMillis();
}
@Override
- public List<FlowFileSummary> getFlowFileSummaries() {
- // TODO Auto-generated method stub
- return null;
+ public synchronized List<FlowFileSummary> getFlowFileSummaries() {
+ return Collections.unmodifiableList(flowFileSummaries);
+ }
+
+ public synchronized void addFlowFileSummaries(final List<FlowFileSummary> summaries) {
+ // TODO: Implement.
+
+ lastUpdated = System.currentTimeMillis();
+ this.numCompletedSteps++;
}
@Override
public QueueSize getQueueSize() {
- // TODO Auto-generated method stub
- return null;
+ return queueSize;
}
public synchronized boolean cancel() {
@@ -84,4 +114,9 @@ public class ListFlowFileRequest implements ListFlowFileStatus {
this.state = ListFlowFileState.CANCELED;
return true;
}
+
+ @Override
+ public synchronized int getCompletionPercentage() {
+ return (int) (100F * numCompletedSteps / numSteps);
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 2ad102f..dd3b687 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -177,6 +177,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
+
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -3242,8 +3244,41 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
public InputStream getContent(final FlowFileRecord flowFile, final String requestor, final String requestUri) throws IOException {
- // TODO: IMPLEMENT
- return null;
+ requireNonNull(flowFile);
+ requireNonNull(requestor);
+ requireNonNull(requestUri);
+
+ final InputStream stream;
+ final ResourceClaim resourceClaim;
+ final ContentClaim contentClaim = flowFile.getContentClaim();
+ if (contentClaim == null) {
+ resourceClaim = null;
+ stream = new ByteArrayInputStream(new byte[0]);
+ } else {
+ resourceClaim = flowFile.getContentClaim().getResourceClaim();
+ stream = contentRepository.read(flowFile.getContentClaim());
+ }
+
+ // Register a Provenance Event to indicate that we replayed the data.
+ final StandardProvenanceEventRecord.Builder sendEventBuilder = new StandardProvenanceEventRecord.Builder()
+ .setEventType(ProvenanceEventType.DOWNLOAD)
+ .setFlowFileUUID(flowFile.getAttribute(CoreAttributes.UUID.key()))
+ .setAttributes(flowFile.getAttributes(), Collections.<String, String> emptyMap())
+ .setTransitUri(requestUri)
+ .setEventTime(System.currentTimeMillis())
+ .setFlowFileEntryDate(flowFile.getEntryDate())
+ .setLineageStartDate(flowFile.getLineageStartDate())
+ .setComponentType(getName())
+ .setComponentId(getRootGroupId())
+ .setDetails("Download of Content requested by " + requestor + " for " + flowFile);
+
+ if (contentClaim != null) {
+ sendEventBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), contentClaim.getOffset(), flowFile.getSize());
+ }
+
+ final ProvenanceEventRecord sendEvent = sendEventBuilder.build();
+ provenanceEventRepository.registerEvent(sendEvent);
+ return stream;
}
private String getReplayFailureReason(final ProvenanceEventRecord event) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/245d8d4d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index cf0d185..9935ba4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -35,11 +35,16 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.nifi.attribute.expression.language.PreparedQuery;
+import org.apache.nifi.attribute.expression.language.Query;
+import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageParsingException;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileSummary;
import org.apache.nifi.controller.queue.ListFlowFileRequest;
+import org.apache.nifi.controller.queue.ListFlowFileState;
import org.apache.nifi.controller.queue.ListFlowFileStatus;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.SortColumn;
@@ -53,8 +58,10 @@ import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
@@ -100,7 +107,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private final int swapThreshold;
private final FlowFileSwapManager swapManager;
private final List<String> swapLocations = new ArrayList<>();
- @SuppressWarnings("unused")
private final TimedLock readLock;
private final TimedLock writeLock;
private final String identifier;
@@ -841,16 +847,194 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override
- public ListFlowFileStatus listFlowFiles(final String requestIdentifier) {
- return listFlowFiles(requestIdentifier, SortColumn.QUEUE_POSITION, SortDirection.ASCENDING);
+ public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults) {
+ return listFlowFiles(requestIdentifier, maxResults, SortColumn.QUEUE_POSITION, SortDirection.ASCENDING);
}
@Override
- public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final SortColumn sortColumn, final SortDirection direction) {
- // TODO: Implement
- return null;
+ public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults, final SortColumn sortColumn, final SortDirection direction) {
+ return listFlowFiles(requestIdentifier, maxResults, null, sortColumn, direction);
+ }
+
+ @Override
+ public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults, final String query, final SortColumn sortColumn, final SortDirection direction) {
+ final PreparedQuery preparedQuery;
+ if (query == null) {
+ preparedQuery = null;
+ } else {
+ try {
+ final ResultType resultType = Query.compile(query).getResultType();
+ if (resultType != ResultType.BOOLEAN) {
+ throw new IllegalArgumentException("Invalid expression Language provided to search the listing of FlowFiles. "
+ + "The expression must return a 'Boolean' type but returns a " + resultType.name() + " type");
+ }
+ preparedQuery = Query.prepare(query);
+ } catch (final AttributeExpressionLanguageParsingException e) {
+ throw new IllegalArgumentException("Invalid Expression Language provided to search the listing of FlowFiles: " + query, e);
+ }
+ }
+
+ // purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother
+ if (listRequestMap.size() > 10) {
+ final List<String> toDrop = new ArrayList<>();
+ for (final Map.Entry<String, ListFlowFileRequest> entry : listRequestMap.entrySet()) {
+ final ListFlowFileRequest request = entry.getValue();
+ final boolean completed = request.getState() == ListFlowFileState.COMPLETE || request.getState() == ListFlowFileState.FAILURE;
+
+ if (completed && System.currentTimeMillis() - request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
+ toDrop.add(entry.getKey());
+ }
+ }
+
+ for (final String requestId : toDrop) {
+ listRequestMap.remove(requestId);
+ }
+ }
+
+ final int numSteps;
+ readLock.lock();
+ try {
+ numSteps = 1 + swapLocations.size();
+ } finally {
+ readLock.unlock("listFlowFiles");
+ }
+
+ final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, sortColumn, direction, size(), numSteps);
+
+ final Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ int position = 0;
+ int resultCount = 0;
+ final List<FlowFileSummary> summaries = new ArrayList<>(activeQueue.size());
+
+ // we need a write lock while using the Active Queue because we can't iterate over it - we have to poll from it
+ // continually. This is because the iterator for PriorityQueue does not iterate over the elements in any particular
+ // order. Since we need the 'position' of the element in the queue, we need to iterate over them in the proper order.
+ writeLock.lock();
+ try {
+ final List<FlowFileRecord> flowFileRecords = new ArrayList<>(activeQueue.size());
+
+ FlowFileRecord flowFile;
+ try {
+ while ((flowFile = activeQueue.poll()) != null) {
+ flowFileRecords.add(flowFile);
+ position++;
+
+ if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) {
+ summaries.add(summarize(flowFile, position));
+ if (++resultCount >= maxResults) {
+ break;
+ }
+ }
+ }
+ } finally {
+ activeQueue.addAll(flowFileRecords);
+ }
+ } finally {
+ writeLock.unlock("List FlowFiles");
+ }
+
+ position = activeQueue.size();
+ sourceLoop: while (resultCount < maxResults) {
+ try {
+ // We are now iterating over swap files, and we don't need the write lock for this, just the read lock, since
+ // we are not modifying anything.
+ readLock.lock();
+ try {
+ for (final String location : swapLocations) {
+ final List<FlowFileRecord> flowFiles = swapManager.peek(location, StandardFlowFileQueue.this);
+ for (final FlowFileRecord flowFile : flowFiles) {
+ position++;
+
+ if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) {
+ summaries.add(summarize(flowFile, position));
+ if (++resultCount >= maxResults) {
+ break sourceLoop;
+ }
+ }
+ }
+ }
+
+ for (final FlowFileRecord flowFile : swapQueue) {
+ position++;
+
+ if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) {
+ summaries.add(summarize(flowFile, position));
+ if (++resultCount >= maxResults) {
+ break sourceLoop;
+ }
+ }
+ }
+ } finally {
+ readLock.unlock("List FlowFiles");
+ }
+ } catch (final IOException ioe) {
+ logger.error("Failed to read swapped FlowFiles in order to perform listing of queue " + StandardFlowFileQueue.this, ioe);
+ listRequest.setFailure("Could not read FlowFiles from queue. Check log files for more details.");
+ }
+ }
+ }
+ }, "List FlowFiles for Connection " + getIdentifier());
+ t.setDaemon(true);
+ t.start();
+
+ listRequestMap.put(requestIdentifier, listRequest);
+ return listRequest;
}
+ private FlowFileSummary summarize(final FlowFile flowFile, final int position) {
+ // extract all of the information that we care about into new variables rather than just
+ // wrapping the FlowFile object with a FlowFileSummary object. We do this because we want to
+ // be able to hold many FlowFileSummary objects in memory and if we just wrap the FlowFile object,
+ // we will end up holding the entire FlowFile (including all Attributes) in the Java heap as well,
+ // which can be problematic if we expect them to be swapped out.
+ final String uuid = flowFile.getAttribute(CoreAttributes.UUID.key());
+ final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
+ final long size = flowFile.getSize();
+ final long lastQueuedTime = flowFile.getLastQueueDate();
+ final long lineageStart = flowFile.getLineageStartDate();
+ final boolean penalized = flowFile.isPenalized();
+
+ return new FlowFileSummary() {
+ @Override
+ public String getUuid() {
+ return uuid;
+ }
+
+ @Override
+ public String getFilename() {
+ return filename;
+ }
+
+ @Override
+ public int getPosition() {
+ return position;
+ }
+
+ @Override
+ public long getSize() {
+ return size;
+ }
+
+ @Override
+ public long lastQueuedTime() {
+ return lastQueuedTime;
+ }
+
+ @Override
+ public long getLineageStartDate() {
+ return lineageStart;
+ }
+
+ @Override
+ public boolean isPenalized() {
+ return penalized;
+ }
+ };
+ }
+
+
@Override
public ListFlowFileStatus getListFlowFileStatus(final String requestIdentifier) {
return listRequestMap.get(requestIdentifier);
@@ -858,6 +1042,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override
public ListFlowFileStatus cancelListFlowFileRequest(final String requestIdentifier) {
+ logger.info("Canceling ListFlowFile Request with ID {}", requestIdentifier);
final ListFlowFileRequest request = listRequestMap.remove(requestIdentifier);
if (request != null) {
request.cancel();
@@ -867,8 +1052,41 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
}
@Override
- public FlowFileRecord getFlowFile(String flowFileUuid) {
- // TODO: Implement
+ public FlowFileRecord getFlowFile(final String flowFileUuid) throws IOException {
+ if (flowFileUuid == null) {
+ return null;
+ }
+
+ readLock.lock();
+ try {
+ // read through all of the FlowFiles in the queue, looking for the FlowFile with the given ID
+ for (final FlowFileRecord flowFile : activeQueue) {
+ if (flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) {
+ return flowFile;
+ }
+ }
+
+ for (final FlowFileRecord flowFile : swapQueue) {
+ if (flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) {
+ return flowFile;
+ }
+ }
+
+ // TODO: consider using a Long flowFileId instead of a UUID, and then having the swap manager
+ // write out the min and max FlowFile ID's. This would allow us to then have a method: boolean isFlowFilePossiblyContained(long id)
+ // which can return a boolean value that can be used to determine whether or not to even call peek
+ for (final String swapLocation : swapLocations) {
+ final List<FlowFileRecord> flowFiles = swapManager.peek(swapLocation, this);
+ for (final FlowFileRecord flowFile : flowFiles) {
+ if (flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) {
+ return flowFile;
+ }
+ }
+ }
+ } finally {
+ readLock.unlock("getFlowFile");
+ }
+
return null;
}
[2/5] nifi git commit: Merge branch 'NIFI-108' of
https://git-wip-us.apache.org/repos/asf/nifi into NIFI-108
Posted by ma...@apache.org.
Merge branch 'NIFI-108' of https://git-wip-us.apache.org/repos/asf/nifi into NIFI-108
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/64a415cb
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/64a415cb
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/64a415cb
Branch: refs/heads/NIFI-108
Commit: 64a415cb443ad03b10c12a5246338848ccfd1f80
Parents: 245d8d4 e762d3c
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Dec 17 13:04:19 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Dec 17 13:04:19 2015 -0500
----------------------------------------------------------------------
.../apache/nifi/web/api/dto/DropRequestDTO.java | 11 +-
.../nifi/web/api/dto/FlowFileSummaryDTO.java | 134 +++++++++
.../nifi/web/api/dto/ListingRequestDTO.java | 169 +++++++++++
.../web/api/entity/ListingRequestEntity.java | 44 +++
.../org/apache/nifi/web/NiFiServiceFacade.java | 31 ++
.../nifi/web/StandardNiFiServiceFacade.java | 18 +-
.../apache/nifi/web/api/ConnectionResource.java | 301 ++++++++++++++++++-
.../org/apache/nifi/web/api/dto/DtoFactory.java | 46 +++
.../org/apache/nifi/web/dao/ConnectionDAO.java | 33 +-
.../web/dao/impl/StandardConnectionDAO.java | 36 ++-
10 files changed, 807 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
[3/5] nifi git commit: NIFI-108: Added merging of response for
listing of flowfiles in cluster manager
Posted by ma...@apache.org.
NIFI-108: Added merging of response for listing of flowfiles in cluster manager
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9408507e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9408507e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9408507e
Branch: refs/heads/NIFI-108
Commit: 9408507eb5733714537afb6266e6fe886609afdc
Parents: 64a415c
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Dec 17 16:46:12 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Dec 17 16:46:12 2015 -0500
----------------------------------------------------------------------
.../nifi/controller/queue/FlowFileSummary.java | 2 +-
.../controller/queue/ListFlowFileStatus.java | 15 +++
.../nifi/controller/queue/SortColumn.java | 64 +++++++++--
.../nifi/web/api/dto/ListingRequestDTO.java | 76 ++++++++++++-
.../cluster/manager/impl/WebClusterManager.java | 111 ++++++++++++++++---
.../controller/queue/ListFlowFileRequest.java | 34 ++++--
.../nifi/controller/FlowFileSummaries.java | 95 ++++++++++++++++
.../nifi/controller/StandardFlowFileQueue.java | 16 ++-
.../org/apache/nifi/web/api/dto/DtoFactory.java | 12 +-
.../web/dao/impl/StandardConnectionDAO.java | 2 +-
10 files changed, 384 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java
index 8c37185..b7207f2 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileSummary.java
@@ -45,7 +45,7 @@ public interface FlowFileSummary {
/**
* @return the timestamp (in milliseconds since epoch) at which the FlowFile was added to the queue
*/
- long lastQueuedTime();
+ long getLastQueuedTime();
/**
* @return the timestamp (in milliseconds since epoch) at which the FlowFile's greatest ancestor entered the flow
http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
index 5781283..cae500d 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileStatus.java
@@ -22,6 +22,11 @@ import java.util.List;
public interface ListFlowFileStatus {
/**
+ * @return the maximum number of FlowFile Summary objects that should be returned
+ */
+ int getMaxResults();
+
+ /**
* @return the identifier of the request to drop FlowFiles from the queue
*/
String getRequestIdentifier();
@@ -72,4 +77,14 @@ public interface ListFlowFileStatus {
* @return the percentage (an integer between 0 and 100, inclusive) of how close the request is to being completed
*/
int getCompletionPercentage();
+
+ /**
+ * @return the total number of steps that are required in order to finish the listing
+ */
+ int getTotalStepCount();
+
+ /**
+ * @return the total number of steps that have already been completed. The value returned will be >= 0 and <= the result of calling {@link #getTotalStepCount()}.
+ */
+ int getCompletedStepCount();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java
index 48f3e9f..30d285c 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/SortColumn.java
@@ -17,44 +17,92 @@
package org.apache.nifi.controller.queue;
+import java.util.Comparator;
+
/**
* Specifies which column to sort on when performing a Listing of FlowFiles via
* {@link FlowFileQueue#listFlowFiles(String, SortColumn, SortDirection)}
*/
-public enum SortColumn {
+public enum SortColumn implements Comparator<FlowFileSummary> {
/**
* Sort based on the current position in the queue
*/
- QUEUE_POSITION,
+ QUEUE_POSITION (new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return Integer.compare(o1.getPosition(), o2.getPosition());
+ }
+ }),
/**
* Sort based on the UUID of the FlowFile
*/
- FLOWFILE_UUID,
+ FLOWFILE_UUID (new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return o1.getUuid().compareTo(o2.getUuid());
+ }
+ }),
/**
* Sort based on the 'filename' attribute of the FlowFile
*/
- FILENAME,
+ FILENAME (new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return o1.getFilename().compareTo(o2.getFilename());
+ }
+ }),
/**
* Sort based on the size of the FlowFile
*/
- FLOWFILE_SIZE,
+ FLOWFILE_SIZE(new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return Long.compare(o1.getSize(), o2.getSize());
+ }
+ }),
/**
* Sort based on how long the FlowFile has been sitting in the queue
*/
- QUEUED_DURATION,
+ QUEUED_DURATION (new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return -Long.compare(o1.getLastQueuedTime(), o2.getLastQueuedTime());
+ }
+ }),
/**
* Sort based on the age of the FlowFile. I.e., the time at which the FlowFile's
* "greatest ancestor" entered the flow
*/
- FLOWFILE_AGE,
+ FLOWFILE_AGE (new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return Long.compare(o1.getLineageStartDate(), o2.getLineageStartDate());
+ }
+ }),
/**
* Sort based on when the FlowFile's penalization ends
*/
- PENALIZATION;
+ PENALIZATION (new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return Boolean.compare(o1.isPenalized(), o2.isPenalized());
+ }
+ });
+
+ private final Comparator<FlowFileSummary> comparator;
+
+ private SortColumn(final Comparator<FlowFileSummary> comparator) {
+ this.comparator = comparator;
+ }
+
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ return comparator.compare(o1, o2);
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java
index 53c2a74..36c0518 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ListingRequestDTO.java
@@ -16,13 +16,15 @@
*/
package org.apache.nifi.web.api.dto;
-import com.wordnik.swagger.annotations.ApiModelProperty;
-import org.apache.nifi.web.api.dto.util.TimestampAdapter;
-
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import java.util.Date;
import java.util.List;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.nifi.web.api.dto.util.TimestampAdapter;
+
+import com.wordnik.swagger.annotations.ApiModelProperty;
+
public class ListingRequestDTO {
private String id;
@@ -34,6 +36,11 @@ public class ListingRequestDTO {
private Integer percentCompleted;
private Boolean finished;
private String failureReason;
+ private String sortColumn;
+ private String sortDirection;
+ private Integer maxResults;
+ private Integer totalStepCount;
+ private Integer completedStepCount;
private String state;
@@ -166,4 +173,65 @@ public class ListingRequestDTO {
public void setFlowFileSummaries(List<FlowFileSummaryDTO> flowFileSummaries) {
this.flowFileSummaries = flowFileSummaries;
}
+
+ /**
+ * @return the column on which the listing is sorted
+ */
+ @ApiModelProperty(value = "The column on which the FlowFiles are sorted.")
+ public String getSortColumn() {
+ return sortColumn;
+ }
+
+ public void setSortColumn(String sortColumn) {
+ this.sortColumn = sortColumn;
+ }
+
+ /**
+ * @return the direction in which the FlowFiles are sorted
+ */
+ @ApiModelProperty(value = "The direction in which the FlowFiles are sorted. Either ASCENDING or DESCENDING.")
+ public String getSortDirection() {
+ return sortDirection;
+ }
+
+ public void setSortDirection(String sortDirection) {
+ this.sortDirection = sortDirection;
+ }
+
+ /**
+ * @return the maximum number of FlowFileSummary objects to return
+ */
+ @ApiModelProperty(value = "The maximum number of FlowFileSummary objects to return")
+ public Integer getMaxResults() {
+ return maxResults;
+ }
+
+ public void setMaxResults(Integer maxResults) {
+ this.maxResults = maxResults;
+ }
+
+
+ /**
+ * @return the total number of steps required to complete the listing
+ */
+ @ApiModelProperty(value = "The total number of steps required to complete the listing")
+ public Integer getTotalStepCount() {
+ return totalStepCount;
+ }
+
+ public void setTotalStepCount(Integer totalStepCount) {
+ this.totalStepCount = totalStepCount;
+ }
+
+ /**
+ * @return the number of steps that have already been completed. This value will be >= 0 and <= the total step count
+ */
+ @ApiModelProperty(value = "The number of steps that have already been completed. This value will be between 0 and the total step count (inclusive)")
+ public Integer getCompletedStepCount() {
+ return completedStepCount;
+ }
+
+ public void setCompletedStepCount(Integer completedStepCount) {
+ this.completedStepCount = completedStepCount;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 035a888..6b43f9d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -34,11 +34,13 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Queue;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
@@ -125,6 +127,7 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.FlowFileSummaries;
import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
@@ -132,6 +135,10 @@ import org.apache.nifi.controller.StandardFlowSerializer;
import org.apache.nifi.controller.StandardProcessorNode;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
+import org.apache.nifi.controller.queue.DropFlowFileState;
+import org.apache.nifi.controller.queue.ListFlowFileState;
+import org.apache.nifi.controller.queue.SortColumn;
+import org.apache.nifi.controller.queue.SortDirection;
import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.reporting.ReportingTaskProvider;
@@ -142,6 +149,7 @@ import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
import org.apache.nifi.controller.service.ControllerServiceLoader;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
@@ -192,13 +200,19 @@ import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.web.OptimisticLockingManager;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.UpdateRevision;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
+import org.apache.nifi.web.api.dto.DropRequestDTO;
+import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+import org.apache.nifi.web.api.dto.ListingRequestDTO;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
+import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
@@ -207,7 +221,12 @@ import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
import org.apache.nifi.web.api.dto.status.NodeStatusHistoryDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
+import org.apache.nifi.web.api.entity.ControllerServicesEntity;
+import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.apache.nifi.web.api.entity.FlowSnippetEntity;
+import org.apache.nifi.web.api.entity.ListingRequestEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorsEntity;
@@ -215,6 +234,8 @@ import org.apache.nifi.web.api.entity.ProvenanceEntity;
import org.apache.nifi.web.api.entity.ProvenanceEventEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
+import org.apache.nifi.web.api.entity.ReportingTaskEntity;
+import org.apache.nifi.web.api.entity.ReportingTasksEntity;
import org.apache.nifi.web.util.WebUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -226,19 +247,6 @@ import org.xml.sax.SAXException;
import org.xml.sax.SAXParseException;
import com.sun.jersey.api.client.ClientResponse;
-import org.apache.nifi.controller.queue.DropFlowFileState;
-
-import org.apache.nifi.controller.service.ControllerServiceState;
-import org.apache.nifi.web.api.dto.ControllerServiceDTO;
-import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
-import org.apache.nifi.web.api.dto.DropRequestDTO;
-import org.apache.nifi.web.api.dto.ReportingTaskDTO;
-import org.apache.nifi.web.api.entity.ControllerServiceEntity;
-import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
-import org.apache.nifi.web.api.entity.ControllerServicesEntity;
-import org.apache.nifi.web.api.entity.DropRequestEntity;
-import org.apache.nifi.web.api.entity.ReportingTaskEntity;
-import org.apache.nifi.web.api.entity.ReportingTasksEntity;
/**
* Provides a cluster manager implementation. The manager federates incoming HTTP client requests to the nodes' external API using the HTTP protocol. The manager also communicates with nodes using the
@@ -319,6 +327,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node";
public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
+ @Deprecated
public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents");
public static final Pattern DROP_REQUEST_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/drop-requests/[a-f0-9\\-]{36}");
public static final Pattern LIST_FLOWFILES_URI = Pattern
@@ -2831,6 +2840,64 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return normalizedValidationErrors;
}
+
+ /**
+ * Merges the listing requests in the specified map into the specified listing request
+ *
+ * @param listingRequest the target listing request
+ * @param listingRequestMap the mapping of all responses being merged
+ */
+ private void mergeListingRequests(final ListingRequestDTO listingRequest, final Map<NodeIdentifier, ListingRequestDTO> listingRequestMap) {
+ final Comparator<FlowFileSummaryDTO> comparator = FlowFileSummaries.createDTOComparator(
+ SortColumn.valueOf(listingRequest.getSortColumn()), SortDirection.valueOf(listingRequest.getSortDirection()));
+
+ final NavigableSet<FlowFileSummaryDTO> flowFileSummaries = new TreeSet<>(comparator);
+
+ ListFlowFileState state = null;
+ int sumOfPercents = 0;
+ boolean finished = true;
+ for (final ListingRequestDTO nodeRequest : listingRequestMap.values()) {
+ Integer percentComplete = nodeRequest.getPercentCompleted();
+ if (percentComplete != null) {
+ sumOfPercents += percentComplete;
+ }
+
+ if (!nodeRequest.getFinished()) {
+ finished = false;
+ }
+
+ if (nodeRequest.getLastUpdated().after(listingRequest.getLastUpdated())) {
+ listingRequest.setLastUpdated(nodeRequest.getLastUpdated());
+ }
+
+ // Keep the state with the lowest ordinal value (the "least completed").
+ final ListFlowFileState nodeState = ListFlowFileState.valueOfDescription(nodeRequest.getState());
+ if (state == null || state.compareTo(nodeState) > 0) {
+ state = nodeState;
+ }
+
+ for (final FlowFileSummaryDTO summaryDTO : nodeRequest.getFlowFileSummaries()) {
+ flowFileSummaries.add(summaryDTO);
+
+ // Keep the set from growing beyond our max
+ if (flowFileSummaries.size() > listingRequest.getMaxResults()) {
+ flowFileSummaries.pollLast();
+ }
+ }
+
+ if (nodeRequest.getFailureReason() != null) {
+ listingRequest.setFailureReason(nodeRequest.getFailureReason());
+ }
+ }
+
+ final List<FlowFileSummaryDTO> summaryDTOs = new ArrayList<>(flowFileSummaries);
+ listingRequest.setFlowFileSummaries(summaryDTOs);
+
+ final int percentCompleted = sumOfPercents / listingRequestMap.size();
+ listingRequest.setPercentCompleted(percentCompleted);
+ listingRequest.setFinished(finished);
+ }
+
/**
* Merges the drop requests in the specified map into the specified drop request.
*
@@ -3316,7 +3383,23 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
clientResponse = new NodeResponse(clientResponse, responseEntity);
} else if (hasSuccessfulClientResponse && isListFlowFilesEndpoint(uri, method)) {
- // TODO: IMPLEMENT
+ final ListingRequestEntity responseEntity = clientResponse.getClientResponse().getEntity(ListingRequestEntity.class);
+ final ListingRequestDTO listingRequest = responseEntity.getListingRequest();
+
+ final Map<NodeIdentifier, ListingRequestDTO> resultsMap = new HashMap<>();
+ for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
+ if (problematicNodeResponses.contains(nodeResponse)) {
+ continue;
+ }
+
+ final ListingRequestEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ListingRequestEntity.class);
+ final ListingRequestDTO nodeListingRequest = nodeResponseEntity.getListingRequest();
+
+ resultsMap.put(nodeResponse.getNodeId(), nodeListingRequest);
+ }
+ mergeListingRequests(listingRequest, resultsMap);
+
+ clientResponse = new NodeResponse(clientResponse, responseEntity);
} else {
if (!nodeResponsesToDrain.isEmpty()) {
drainResponses(nodeResponsesToDrain);
http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
index e83fd80..aad4c4f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/ListFlowFileRequest.java
@@ -23,6 +23,7 @@ import java.util.List;
public class ListFlowFileRequest implements ListFlowFileStatus {
private final String requestId;
+ private final int maxResults;
private final QueueSize queueSize;
private final SortColumn sortColumn;
private final SortDirection sortDirection;
@@ -32,13 +33,14 @@ public class ListFlowFileRequest implements ListFlowFileStatus {
private ListFlowFileState state;
private String failureReason;
private int numSteps;
- private int numCompletedSteps;
+ private int completedStepCount;
private long lastUpdated = System.currentTimeMillis();
- public ListFlowFileRequest(final String requestId, final SortColumn sortColumn, final SortDirection sortDirection, final QueueSize queueSize, final int numSteps) {
+ public ListFlowFileRequest(final String requestId, final SortColumn sortColumn, final SortDirection sortDirection, final int maxResults, final QueueSize queueSize, final int numSteps) {
this.requestId = requestId;
this.sortColumn = sortColumn;
this.sortDirection = sortDirection;
+ this.maxResults = maxResults;
this.queueSize = queueSize;
this.numSteps = numSteps;
}
@@ -94,11 +96,10 @@ public class ListFlowFileRequest implements ListFlowFileStatus {
return Collections.unmodifiableList(flowFileSummaries);
}
- public synchronized void addFlowFileSummaries(final List<FlowFileSummary> summaries) {
- // TODO: Implement.
-
+ public synchronized void setFlowFileSummaries(final List<FlowFileSummary> summaries) {
+ this.flowFileSummaries.clear();
+ this.flowFileSummaries.addAll(summaries);
lastUpdated = System.currentTimeMillis();
- this.numCompletedSteps++;
}
@Override
@@ -117,6 +118,25 @@ public class ListFlowFileRequest implements ListFlowFileStatus {
@Override
public synchronized int getCompletionPercentage() {
- return (int) (100F * numCompletedSteps / numSteps);
+ return (int) (100F * completedStepCount / numSteps);
+ }
+
+ public synchronized void setCompletedStepCount(final int completedStepCount) {
+ this.completedStepCount = completedStepCount;
+ }
+
+ @Override
+ public int getMaxResults() {
+ return maxResults;
+ }
+
+ @Override
+ public int getTotalStepCount() {
+ return numSteps;
+ }
+
+ @Override
+ public int getCompletedStepCount() {
+ return completedStepCount;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java
new file mode 100644
index 0000000..5a0a3ab
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFileSummaries.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller;
+
+import java.util.Collections;
+import java.util.Comparator;
+
+import org.apache.nifi.controller.queue.FlowFileSummary;
+import org.apache.nifi.controller.queue.SortColumn;
+import org.apache.nifi.controller.queue.SortDirection;
+import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
+
+public class FlowFileSummaries {
+
+ public static Comparator<FlowFileSummary> createComparator(final SortColumn column, final SortDirection direction) {
+ final Comparator<FlowFileSummary> comparator = new Comparator<FlowFileSummary>() {
+ @Override
+ public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
+ switch (column) {
+ case FILENAME:
+ return o1.getFilename().compareTo(o2.getFilename());
+ case FLOWFILE_AGE:
+ return Long.compare(o1.getLineageStartDate(), o2.getLineageStartDate());
+ case FLOWFILE_SIZE:
+ return Long.compare(o1.getSize(), o2.getSize());
+ case FLOWFILE_UUID:
+ return o1.getUuid().compareTo(o2.getUuid());
+ case PENALIZATION:
+ return Boolean.compare(o1.isPenalized(), o2.isPenalized());
+ case QUEUE_POSITION:
+ return Long.compare(o1.getPosition(), o2.getPosition());
+ case QUEUED_DURATION:
+ return Long.compare(o1.getLastQueuedTime(), o2.getLastQueuedTime());
+ }
+
+ return 0;
+ }
+ };
+
+
+ if (direction == SortDirection.DESCENDING) {
+ return Collections.reverseOrder(comparator);
+ } else {
+ return comparator;
+ }
+ }
+
+ public static Comparator<FlowFileSummaryDTO> createDTOComparator(final SortColumn column, final SortDirection direction) {
+ final Comparator<FlowFileSummaryDTO> comparator = new Comparator<FlowFileSummaryDTO>() {
+ @Override
+ public int compare(final FlowFileSummaryDTO o1, final FlowFileSummaryDTO o2) {
+ switch (column) {
+ case FILENAME:
+ return o1.getFilename().compareTo(o2.getFilename());
+ case FLOWFILE_AGE:
+ return o1.getLinageStartDate().compareTo(o2.getLinageStartDate());
+ case FLOWFILE_SIZE:
+ return Long.compare(o1.getSize(), o2.getSize());
+ case FLOWFILE_UUID:
+ return o1.getUuid().compareTo(o2.getUuid());
+ case PENALIZATION:
+ return Boolean.compare(o1.getPenalized(), o2.getPenalized());
+ case QUEUE_POSITION:
+ return Long.compare(o1.getPosition(), o2.getPosition());
+ case QUEUED_DURATION:
+ return o1.getLastQueuedTime().compareTo(o2.getLastQueuedTime());
+ }
+
+ return 0;
+ }
+ };
+
+ if (direction == SortDirection.DESCENDING) {
+ return Collections.reverseOrder(comparator);
+ } else {
+ return comparator;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 9935ba4..daaa763 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -894,12 +894,13 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final int numSteps;
readLock.lock();
try {
- numSteps = 1 + swapLocations.size();
+ // numSteps = 1 for each swap location + 1 for active queue + 1 for swap queue.
+ numSteps = 2 + swapLocations.size();
} finally {
readLock.unlock("listFlowFiles");
}
- final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, sortColumn, direction, size(), numSteps);
+ final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, sortColumn, direction, maxResults, size(), numSteps);
final Thread t = new Thread(new Runnable() {
@Override
@@ -907,6 +908,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
int position = 0;
int resultCount = 0;
final List<FlowFileSummary> summaries = new ArrayList<>(activeQueue.size());
+ int completedStepCount = 0;
// we need a write lock while using the Active Queue because we can't iterate over it - we have to poll from it
// continually. This is because the iterator for PriorityQueue does not iterate over the elements in any particular
@@ -935,6 +937,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
writeLock.unlock("List FlowFiles");
}
+ listRequest.setCompletedStepCount(++completedStepCount);
+
position = activeQueue.size();
sourceLoop: while (resultCount < maxResults) {
try {
@@ -954,6 +958,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
}
}
}
+
+ listRequest.setCompletedStepCount(++completedStepCount);
}
for (final FlowFileRecord flowFile : swapQueue) {
@@ -966,6 +972,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
}
}
}
+
+ listRequest.setCompletedStepCount(++completedStepCount);
} finally {
readLock.unlock("List FlowFiles");
}
@@ -974,6 +982,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
listRequest.setFailure("Could not read FlowFiles from queue. Check log files for more details.");
}
}
+
+ listRequest.setFlowFileSummaries(summaries);
}
}, "List FlowFiles for Connection " + getIdentifier());
t.setDaemon(true);
@@ -1018,7 +1028,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
}
@Override
- public long lastQueuedTime() {
+ public long getLastQueuedTime() {
return lastQueuedTime;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index b93ff95..0805b54 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -360,10 +360,14 @@ public final class DtoFactory {
dto.setState(listingRequest.getState().toString());
dto.setFailureReason(listingRequest.getFailureReason());
dto.setFinished(isListingRequestComplete(listingRequest.getState()));
+ dto.setMaxResults(listingRequest.getMaxResults());
+ dto.setSortColumn(listingRequest.getSortColumn().name());
+ dto.setSortDirection(listingRequest.getSortDirection().name());
+ dto.setTotalStepCount(listingRequest.getTotalStepCount());
+ dto.setCompletedStepCount(listingRequest.getCompletedStepCount());
+ dto.setPercentCompleted(listingRequest.getCompletionPercentage());
if (isListingRequestComplete(listingRequest.getState())) {
- dto.setPercentCompleted(100);
-
final List<FlowFileSummary> flowFileSummaries = listingRequest.getFlowFileSummaries();
if (flowFileSummaries != null) {
final List<FlowFileSummaryDTO> summaryDtos = new ArrayList<>(flowFileSummaries.size());
@@ -372,8 +376,6 @@ public final class DtoFactory {
}
dto.setFlowFileSummaries(summaryDtos);
}
- } else {
- dto.setPercentCompleted(50);
}
return dto;
@@ -383,7 +385,7 @@ public final class DtoFactory {
final FlowFileSummaryDTO dto = new FlowFileSummaryDTO();
dto.setUuid(summary.getUuid());
dto.setFilename(summary.getFilename());
- dto.setLastQueuedTime(new Date(summary.lastQueuedTime()));
+ dto.setLastQueuedTime(new Date(summary.getLastQueuedTime()));
dto.setLinageStartDate(new Date(summary.getLineageStartDate()));
dto.setPenalized(summary.isPenalized());
dto.setPosition(summary.getPosition());
http://git-wip-us.apache.org/repos/asf/nifi/blob/9408507e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
index 0e9a90a..459c2b5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
@@ -342,7 +342,7 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
public ListFlowFileStatus createFlowFileListingRequest(String groupId, String id, String listingRequestId) {
final Connection connection = locateConnection(groupId, id);
final FlowFileQueue queue = connection.getFlowFileQueue();
- return queue.listFlowFiles(listingRequestId);
+ return queue.listFlowFiles(listingRequestId, 100);
}
@Override
[4/5] nifi git commit: Merge branch 'NIFI-108' of
https://git-wip-us.apache.org/repos/asf/nifi into NIFI-108
Posted by ma...@apache.org.
Merge branch 'NIFI-108' of https://git-wip-us.apache.org/repos/asf/nifi into NIFI-108
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f4ef8847
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f4ef8847
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f4ef8847
Branch: refs/heads/NIFI-108
Commit: f4ef8847571889aa4627654a912bdbb7308865bf
Parents: 9408507 6707337
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Dec 17 16:46:14 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Dec 17 16:46:14 2015 -0500
----------------------------------------------------------------------
.../apache/nifi/web/api/dto/FlowFileDTO.java | 40 +++
.../nifi/web/api/dto/FlowFileSummaryDTO.java | 32 ++
.../nifi/web/api/entity/FlowFileEntity.java | 44 +++
.../org/apache/nifi/web/NiFiServiceFacade.java | 22 ++
.../nifi/web/StandardNiFiServiceFacade.java | 11 +
.../apache/nifi/web/api/ConnectionResource.java | 333 ++++++++++++++++---
.../org/apache/nifi/web/api/dto/DtoFactory.java | 14 +
.../org/apache/nifi/web/dao/ConnectionDAO.java | 25 +-
.../web/dao/impl/StandardConnectionDAO.java | 79 +++++
9 files changed, 556 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4ef8847/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/f4ef8847/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
----------------------------------------------------------------------
[5/5] nifi git commit: NIFI-108: Add clusterNodeId to
FlowFileSummaryDTO
Posted by ma...@apache.org.
NIFI-108: Add clusterNodeId to FlowFileSummaryDTO
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b12aba78
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b12aba78
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b12aba78
Branch: refs/heads/NIFI-108
Commit: b12aba7829e91b228568e5a3aa75558c3c227247
Parents: f4ef884
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Dec 17 16:54:13 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Dec 17 16:54:13 2015 -0500
----------------------------------------------------------------------
.../org/apache/nifi/cluster/manager/impl/WebClusterManager.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/b12aba78/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 6b43f9d..210cf52 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -2856,7 +2856,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
ListFlowFileState state = null;
int sumOfPercents = 0;
boolean finished = true;
- for (final ListingRequestDTO nodeRequest : listingRequestMap.values()) {
+ for (final Map.Entry<NodeIdentifier, ListingRequestDTO> entry : listingRequestMap.entrySet()) {
+ final ListingRequestDTO nodeRequest = entry.getValue();
Integer percentComplete = nodeRequest.getPercentCompleted();
if (percentComplete != null) {
sumOfPercents += percentComplete;
@@ -2877,6 +2878,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
for (final FlowFileSummaryDTO summaryDTO : nodeRequest.getFlowFileSummaries()) {
+ summaryDTO.setClusterNodeId(entry.getKey().getId());
flowFileSummaries.add(summaryDTO);
// Keep the set from growing beyond our max