You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/13 18:20:28 UTC
nifi git commit: NIFI-730: Added additional parameters to
dropFlowFiles
Repository: nifi
Updated Branches:
refs/heads/NIFI-730 09d6fe5cd -> af78354d8
NIFI-730: Added additional parameters to dropFlowFiles
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/af78354d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/af78354d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/af78354d
Branch: refs/heads/NIFI-730
Commit: af78354d84dcd72a23f1101567729a04ee008110
Parents: 09d6fe5
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Oct 13 12:20:18 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Oct 13 12:20:18 2015 -0400
----------------------------------------------------------------------
.../controller/queue/DropFlowFileStatus.java | 6 +++++
.../nifi/controller/queue/FlowFileQueue.java | 5 +++-
.../apache/nifi/controller/queue/QueueSize.java | 14 ++++++++++
.../nifi/controller/DropFlowFileRequest.java | 10 ++++++++
.../nifi/controller/StandardFlowFileQueue.java | 27 ++++++++++----------
5 files changed, 48 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/af78354d/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
index 3c3be9b..7d5b9c2 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
@@ -61,6 +61,12 @@ public interface DropFlowFileStatus {
QueueSize getCurrentSize();
/**
+ * @return a QueueSize representing the number of FlowFiles that have been dropped for this request
+ * and the aggregate size of those FlowFiles
+ */
+ QueueSize getDroppedSize();
+
+ /**
* @return the current state of the operation
*/
DropFlowFileState getState();
http://git-wip-us.apache.org/repos/asf/nifi/blob/af78354d/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
index bc2f358..2d67d58 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
@@ -181,9 +181,12 @@ public interface FlowFileQueue {
* passed to the {@link #getDropFlowFileStatus(String)} and {@link #cancelDropFlowFileStatus(String)}
* methods in order to obtain the status later or cancel a request
*
+ * @param requestor the entity that is requesting that the FlowFiles be dropped; this will be
+ * included in the Provenance Events that are generated.
+ *
* @return the status of the drop request.
*/
- DropFlowFileStatus dropFlowFiles();
+ DropFlowFileStatus dropFlowFiles(String requestIdentifier, String requestor);
/**
* Returns the current status of a Drop FlowFile Request that was initiated via the
http://git-wip-us.apache.org/repos/asf/nifi/blob/af78354d/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
index 528d652..7998d37 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
@@ -48,6 +48,20 @@ public class QueueSize {
return totalSizeBytes;
}
+ /**
+ * Returns a new QueueSize that is the sum of this QueueSize and the provided QueueSize
+ *
+ * @param other the other QueueSize to add to this QueueSize
+ * @return a new QueueSize that is the sum of this QueueSize and the provided QueueSize
+ */
+ public QueueSize add(final QueueSize other) {
+ if (other == null) {
+ return new QueueSize(objectCount, totalSizeBytes);
+ }
+
+ return new QueueSize(objectCount + other.getObjectCount(), totalSizeBytes + other.getByteCount());
+ }
+
@Override
public String toString() {
return "QueueSize[FlowFiles=" + objectCount + ", ContentSize=" + NumberFormat.getNumberInstance().format(totalSizeBytes) + " Bytes]";
http://git-wip-us.apache.org/repos/asf/nifi/blob/af78354d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
index 609fe75..4104308 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
@@ -27,6 +27,7 @@ public class DropFlowFileRequest implements DropFlowFileStatus {
private volatile QueueSize originalSize;
private volatile QueueSize currentSize;
+ private volatile QueueSize droppedSize = new QueueSize(0, 0L);
private volatile long lastUpdated = System.currentTimeMillis();
private volatile Thread executionThread;
@@ -66,6 +67,15 @@ public class DropFlowFileRequest implements DropFlowFileStatus {
}
@Override
+ public QueueSize getDroppedSize() {
+ return droppedSize;
+ }
+
+ void setDroppedSize(final QueueSize droppedSize) {
+ this.droppedSize = droppedSize;
+ }
+
+ @Override
public DropFlowFileState getState() {
return state;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/af78354d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 073e5fb..b699ceb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -29,7 +29,6 @@ import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -900,7 +899,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>();
@Override
- public DropFlowFileStatus dropFlowFiles() {
+ public DropFlowFileStatus dropFlowFiles(final String requestIdentifier, final String requestor) {
// purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother
if (dropRequestMap.size() > 10) {
final List<String> toDrop = new ArrayList<>();
@@ -918,10 +917,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
}
}
- // TODO: get user name!
- final String userName = null;
-
- final String requestIdentifier = UUID.randomUUID().toString();
final DropFlowFileRequest dropRequest = new DropFlowFileRequest(requestIdentifier);
final Thread t = new Thread(new Runnable() {
@Override
@@ -932,20 +927,23 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
try {
final List<FlowFileRecord> activeQueueRecords = new ArrayList<>(activeQueue);
- drop(activeQueueRecords, userName);
+ QueueSize droppedSize = drop(activeQueueRecords, requestor);
activeQueue.clear();
dropRequest.setCurrentSize(getQueueSize());
+ dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
- drop(swapQueue, userName);
+ droppedSize = drop(swapQueue, requestor);
swapQueue.clear();
dropRequest.setCurrentSize(getQueueSize());
+ dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
final Iterator<String> swapLocationItr = swapLocations.iterator();
while (swapLocationItr.hasNext()) {
final String swapLocation = swapLocationItr.next();
final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
try {
- drop(swappedIn, userName);
+ droppedSize = drop(swappedIn, requestor);
+ dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
} catch (final Exception e) {
activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue.
throw e;
@@ -974,15 +972,16 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
return dropRequest;
}
- private void drop(final List<FlowFileRecord> flowFiles, final String user) throws IOException {
+ private QueueSize drop(final List<FlowFileRecord> flowFiles, final String requestor) throws IOException {
// Create a Provenance Event and a FlowFile Repository record for each FlowFile
final List<ProvenanceEventRecord> provenanceEvents = new ArrayList<>(flowFiles.size());
final List<RepositoryRecord> flowFileRepoRecords = new ArrayList<>(flowFiles.size());
for (final FlowFileRecord flowFile : flowFiles) {
- provenanceEvents.add(createDropEvent(flowFile, user));
+ provenanceEvents.add(createDropEvent(flowFile, requestor));
flowFileRepoRecords.add(createDeleteRepositoryRecord(flowFile));
}
+ long dropContentSize = 0L;
for (final FlowFileRecord flowFile : flowFiles) {
final ContentClaim contentClaim = flowFile.getContentClaim();
if (contentClaim == null) {
@@ -995,20 +994,22 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
}
resourceClaimManager.decrementClaimantCount(resourceClaim);
+ dropContentSize += flowFile.getSize();
}
provRepository.registerEvents(provenanceEvents);
flowFileRepository.updateRepository(flowFileRepoRecords);
+ return new QueueSize(flowFiles.size(), dropContentSize);
}
- private ProvenanceEventRecord createDropEvent(final FlowFileRecord flowFile, final String user) {
+ private ProvenanceEventRecord createDropEvent(final FlowFileRecord flowFile, final String requestor) {
final ProvenanceEventBuilder builder = provRepository.eventBuilder();
builder.fromFlowFile(flowFile);
builder.setEventType(ProvenanceEventType.DROP);
builder.setLineageStartDate(flowFile.getLineageStartDate());
builder.setComponentId(getIdentifier());
builder.setComponentType("Connection");
- builder.setDetails("FlowFile manually dropped by user " + user);
+ builder.setDetails("FlowFile manually dropped; request made by " + requestor);
return builder.build();
}