You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/03 16:36:37 UTC

[09/40] nifi git commit: NIFI-730: Added additional parameters to dropFlowFiles

NIFI-730: Added additional parameters to dropFlowFiles


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/af78354d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/af78354d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/af78354d

Branch: refs/heads/NIFI-274
Commit: af78354d84dcd72a23f1101567729a04ee008110
Parents: 09d6fe5
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Oct 13 12:20:18 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Oct 13 12:20:18 2015 -0400

----------------------------------------------------------------------
 .../controller/queue/DropFlowFileStatus.java    |  6 +++++
 .../nifi/controller/queue/FlowFileQueue.java    |  5 +++-
 .../apache/nifi/controller/queue/QueueSize.java | 14 ++++++++++
 .../nifi/controller/DropFlowFileRequest.java    | 10 ++++++++
 .../nifi/controller/StandardFlowFileQueue.java  | 27 ++++++++++----------
 5 files changed, 48 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/af78354d/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
index 3c3be9b..7d5b9c2 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
@@ -61,6 +61,12 @@ public interface DropFlowFileStatus {
     QueueSize getCurrentSize();
 
     /**
+     * @return a QueueSize representing the number of FlowFiles that have been dropped for this request
+     *         and the aggregate size of those FlowFiles
+     */
+    QueueSize getDroppedSize();
+
+    /**
      * @return the current state of the operation
      */
     DropFlowFileState getState();

http://git-wip-us.apache.org/repos/asf/nifi/blob/af78354d/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
index bc2f358..2d67d58 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
@@ -181,9 +181,12 @@ public interface FlowFileQueue {
      * passed to the {@link #getDropFlowFileStatus(String)} and {@link #cancelDropFlowFileStatus(String)}
      * methods in order to obtain the status later or cancel a request
      *
+     * @param requestor the entity that is requesting that the FlowFiles be dropped; this will be
+     *            included in the Provenance Events that are generated.
+     *
      * @return the status of the drop request.
      */
-    DropFlowFileStatus dropFlowFiles();
+    DropFlowFileStatus dropFlowFiles(String requestIdentifier, String requestor);
 
     /**
      * Returns the current status of a Drop FlowFile Request that was initiated via the

http://git-wip-us.apache.org/repos/asf/nifi/blob/af78354d/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
index 528d652..7998d37 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
@@ -48,6 +48,20 @@ public class QueueSize {
         return totalSizeBytes;
     }
 
+    /**
+     * Returns a new QueueSize that is the sum of this QueueSize and the provided QueueSize
+     * 
+     * @param other the other QueueSize to add to this QueueSize
+     * @return a new QueueSize that is the sum of this QueueSize and the provided QueueSize
+     */
+    public QueueSize add(final QueueSize other) {
+        if (other == null) {
+            return new QueueSize(objectCount, totalSizeBytes);
+        }
+
+        return new QueueSize(objectCount + other.getObjectCount(), totalSizeBytes + other.getByteCount());
+    }
+
     @Override
     public String toString() {
         return "QueueSize[FlowFiles=" + objectCount + ", ContentSize=" + NumberFormat.getNumberInstance().format(totalSizeBytes) + " Bytes]";

http://git-wip-us.apache.org/repos/asf/nifi/blob/af78354d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
index 609fe75..4104308 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
@@ -27,6 +27,7 @@ public class DropFlowFileRequest implements DropFlowFileStatus {
 
     private volatile QueueSize originalSize;
     private volatile QueueSize currentSize;
+    private volatile QueueSize droppedSize = new QueueSize(0, 0L);
     private volatile long lastUpdated = System.currentTimeMillis();
     private volatile Thread executionThread;
 
@@ -66,6 +67,15 @@ public class DropFlowFileRequest implements DropFlowFileStatus {
     }
 
     @Override
+    public QueueSize getDroppedSize() {
+        return droppedSize;
+    }
+
+    void setDroppedSize(final QueueSize droppedSize) {
+        this.droppedSize = droppedSize;
+    }
+
+    @Override
     public DropFlowFileState getState() {
         return state;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/af78354d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 073e5fb..b699ceb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -29,7 +29,6 @@ import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
@@ -900,7 +899,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>();
 
     @Override
-    public DropFlowFileStatus dropFlowFiles() {
+    public DropFlowFileStatus dropFlowFiles(final String requestIdentifier, final String requestor) {
         // purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother
         if (dropRequestMap.size() > 10) {
             final List<String> toDrop = new ArrayList<>();
@@ -918,10 +917,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
             }
         }
 
-        // TODO: get user name!
-        final String userName = null;
-
-        final String requestIdentifier = UUID.randomUUID().toString();
         final DropFlowFileRequest dropRequest = new DropFlowFileRequest(requestIdentifier);
         final Thread t = new Thread(new Runnable() {
             @Override
@@ -932,20 +927,23 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
                     try {
                         final List<FlowFileRecord> activeQueueRecords = new ArrayList<>(activeQueue);
-                        drop(activeQueueRecords, userName);
+                        QueueSize droppedSize = drop(activeQueueRecords, requestor);
                         activeQueue.clear();
                         dropRequest.setCurrentSize(getQueueSize());
+                        dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
 
-                        drop(swapQueue, userName);
+                        droppedSize = drop(swapQueue, requestor);
                         swapQueue.clear();
                         dropRequest.setCurrentSize(getQueueSize());
+                        dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
 
                         final Iterator<String> swapLocationItr = swapLocations.iterator();
                         while (swapLocationItr.hasNext()) {
                             final String swapLocation = swapLocationItr.next();
                             final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
                             try {
-                                drop(swappedIn, userName);
+                                droppedSize = drop(swappedIn, requestor);
+                                dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
                             } catch (final Exception e) {
                                 activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue.
                                 throw e;
@@ -974,15 +972,16 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         return dropRequest;
     }
 
-    private void drop(final List<FlowFileRecord> flowFiles, final String user) throws IOException {
+    private QueueSize drop(final List<FlowFileRecord> flowFiles, final String requestor) throws IOException {
         // Create a Provenance Event and a FlowFile Repository record for each FlowFile
         final List<ProvenanceEventRecord> provenanceEvents = new ArrayList<>(flowFiles.size());
         final List<RepositoryRecord> flowFileRepoRecords = new ArrayList<>(flowFiles.size());
         for (final FlowFileRecord flowFile : flowFiles) {
-            provenanceEvents.add(createDropEvent(flowFile, user));
+            provenanceEvents.add(createDropEvent(flowFile, requestor));
             flowFileRepoRecords.add(createDeleteRepositoryRecord(flowFile));
         }
 
+        long dropContentSize = 0L;
         for (final FlowFileRecord flowFile : flowFiles) {
             final ContentClaim contentClaim = flowFile.getContentClaim();
             if (contentClaim == null) {
@@ -995,20 +994,22 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
             }
 
             resourceClaimManager.decrementClaimantCount(resourceClaim);
+            dropContentSize += flowFile.getSize();
         }
 
         provRepository.registerEvents(provenanceEvents);
         flowFileRepository.updateRepository(flowFileRepoRecords);
+        return new QueueSize(flowFiles.size(), dropContentSize);
     }
 
-    private ProvenanceEventRecord createDropEvent(final FlowFileRecord flowFile, final String user) {
+    private ProvenanceEventRecord createDropEvent(final FlowFileRecord flowFile, final String requestor) {
         final ProvenanceEventBuilder builder = provRepository.eventBuilder();
         builder.fromFlowFile(flowFile);
         builder.setEventType(ProvenanceEventType.DROP);
         builder.setLineageStartDate(flowFile.getLineageStartDate());
         builder.setComponentId(getIdentifier());
         builder.setComponentType("Connection");
-        builder.setDetails("FlowFile manually dropped by user " + user);
+        builder.setDetails("FlowFile manually dropped; request made by " + requestor);
         return builder.build();
     }