You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/14 15:46:29 UTC

nifi git commit: NIFI-730: Return DropFlowFileStatus object when calling cancel

Repository: nifi
Updated Branches:
  refs/heads/NIFI-730 77f7d7524 -> 0af1acaaf


NIFI-730: Return DropFlowFileStatus object when calling cancel


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0af1acaa
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0af1acaa
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0af1acaa

Branch: refs/heads/NIFI-730
Commit: 0af1acaafaf28844b31c41e276c4dbce18390acb
Parents: 77f7d75
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Oct 14 09:46:21 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Oct 14 09:46:21 2015 -0400

----------------------------------------------------------------------
 .../nifi/controller/queue/FlowFileQueue.java     | 10 ++++++----
 .../nifi/controller/StandardFlowFileQueue.java   | 19 ++++++++++++++-----
 .../controller/TestStandardFlowFileQueue.java    |  7 ++-----
 3 files changed, 22 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0af1acaa/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
index 2d67d58..dcf7f13 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
@@ -199,11 +199,13 @@ public interface FlowFileQueue {
     DropFlowFileStatus getDropFlowFileStatus(String requestIdentifier);
 
     /**
-     * Cancels the request to drop FlowFiles that has the given identifier
+     * Cancels the request to drop FlowFiles that has the given identifier. After this method is called, the request
+     * will no longer be known by this queue, so subsequent calls to {@link #getDropFlowFileStatus(String)} or
+     * {@link #cancelDropFlowFileRequest(String)} will return <code>null</code>
      *
      * @param requestIdentifier the identifier of the Drop FlowFile Request
-     * @return <code>true</code> if the request was canceled, <code>false</code> if the request has
-     *         already completed or is not known
+     * @return the status for the request with the given identifier after it has been canceled, or <code>null</code> if no
+     *         request status exists with that identifier
      */
-    boolean cancelDropFlowFileRequest(String requestIdentifier);
+    DropFlowFileStatus cancelDropFlowFileRequest(String requestIdentifier);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0af1acaa/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index acf2830..8085760 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -1078,7 +1078,16 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         builder.setLineageStartDate(flowFile.getLineageStartDate());
         builder.setComponentId(getIdentifier());
         builder.setComponentType("Connection");
-        builder.setDetails("FlowFile manually dropped; request made by " + requestor);
+        builder.setAttributes(flowFile.getAttributes(), Collections.<String, String> emptyMap());
+        builder.setDetails("Manually dropped by " + requestor);
+        builder.setSourceQueueIdentifier(getIdentifier());
+
+        final ContentClaim contentClaim = flowFile.getContentClaim();
+        if (contentClaim != null) {
+            final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+            builder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), contentClaim.getOffset(), flowFile.getSize());
+        }
+
         return builder.build();
     }
 
@@ -1138,14 +1147,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
 
     @Override
-    public boolean cancelDropFlowFileRequest(final String requestIdentifier) {
+    public DropFlowFileRequest cancelDropFlowFileRequest(final String requestIdentifier) {
         final DropFlowFileRequest request = dropRequestMap.remove(requestIdentifier);
         if (request == null) {
-            return false;
+            return null;
         }
 
-        final boolean successful = request.cancel();
-        return successful;
+        request.cancel();
+        return request;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/0af1acaa/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 3789ea5..4b67d91 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -156,7 +156,7 @@ public class TestStandardFlowFileQueue {
         queue.poll(exp);
     }
 
-    @Test
+    @Test(timeout = 20000)
     public void testDropSwappedFlowFiles() {
         for (int i = 1; i <= 210000; i++) {
             queue.put(new TestFlowFile());
@@ -165,15 +165,12 @@ public class TestStandardFlowFileQueue {
         assertEquals(20, swapManager.swappedOut.size());
         final DropFlowFileStatus status = queue.dropFlowFiles("1", "Unit Test");
         while (status.getState() != DropFlowFileState.COMPLETE) {
-            final QueueSize queueSize = queue.size();
-            System.out.println(queueSize);
             try {
-                Thread.sleep(1000L);
+                Thread.sleep(100L);
             } catch (final Exception e) {
             }
         }
 
-        System.out.println(queue.size());
         assertEquals(0, queue.size().getObjectCount());
         assertEquals(0, queue.size().getByteCount());
         assertEquals(0, swapManager.swappedOut.size());