You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/03 16:36:40 UTC

[12/40] nifi git commit: NIFI-730: Added error messages if we fail to drop FlowFiles from queue

NIFI-730: Added error messages if we fail to drop FlowFiles from queue


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/afb76afc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/afb76afc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/afb76afc

Branch: refs/heads/NIFI-274
Commit: afb76afcd0fd7d0c144a37621fdabc181bd42307
Parents: 72ff2a2
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Oct 13 15:57:18 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Oct 13 15:57:18 2015 -0400

----------------------------------------------------------------------
 .../controller/queue/DropFlowFileStatus.java    |  5 +++
 .../nifi/controller/DropFlowFileRequest.java    | 11 +++++
 .../nifi/controller/StandardFlowFileQueue.java  | 45 ++++++++++++++++----
 3 files changed, 53 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/afb76afc/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
index 7d5b9c2..737fbe3 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
@@ -70,4 +70,9 @@ public interface DropFlowFileStatus {
      * @return the current state of the operation
      */
     DropFlowFileState getState();
+
+    /**
+     * @return the reason that the state is set to a Failure state, or <code>null</code> if the state is not {@link DropFlowFileState#FAILURE}.
+     */
+    String getFailureReason();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/afb76afc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
index 4104308..189fe7d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
@@ -30,6 +30,7 @@ public class DropFlowFileRequest implements DropFlowFileStatus {
     private volatile QueueSize droppedSize = new QueueSize(0, 0L);
     private volatile long lastUpdated = System.currentTimeMillis();
     private volatile Thread executionThread;
+    private volatile String failureReason;
 
     private DropFlowFileState state = DropFlowFileState.WAITING_FOR_LOCK;
 
@@ -85,8 +86,18 @@ public class DropFlowFileRequest implements DropFlowFileStatus {
         return lastUpdated;
     }
 
+    @Override
+    public String getFailureReason() {
+        return failureReason;
+    }
+
     synchronized void setState(final DropFlowFileState state) {
+        setState(state, null);
+    }
+
+    synchronized void setState(final DropFlowFileState state, final String explanation) {
         this.state = state;
+        this.failureReason = explanation;
         this.lastUpdated = System.currentTimeMillis();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/afb76afc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 82c1c7e..5b137f7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -928,14 +928,34 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
                     try {
                         final List<FlowFileRecord> activeQueueRecords = new ArrayList<>(activeQueue);
-                        QueueSize droppedSize = drop(activeQueueRecords, requestor);
+
+                        QueueSize droppedSize;
+                        try {
+                            droppedSize = drop(activeQueueRecords, requestor);
+                        } catch (final IOException ioe) {
+                            logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
+                            logger.error("", ioe);
+
+                            dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString());
+                            return;
+                        }
+
                         activeQueue.clear();
                         activeQueueContentSize = 0;
                         activeQueueSizeRef.set(0);
                         dropRequest.setCurrentSize(getQueueSize());
                         dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
 
-                        droppedSize = drop(swapQueue, requestor);
+                        try {
+                            droppedSize = drop(swapQueue, requestor);
+                        } catch (final IOException ioe) {
+                            logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
+                            logger.error("", ioe);
+
+                            dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString());
+                            return;
+                        }
+
                         swapQueue.clear();
                         dropRequest.setCurrentSize(getQueueSize());
                         dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
@@ -946,12 +966,22 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                         final Iterator<String> swapLocationItr = swapLocations.iterator();
                         while (swapLocationItr.hasNext()) {
                             final String swapLocation = swapLocationItr.next();
-                            final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
+
+                            List<FlowFileRecord> swappedIn = null;
                             try {
+                                swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
                                 droppedSize = drop(swappedIn, requestor);
-                            } catch (final Exception e) {
-                                activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue.
-                                throw e;
+                            } catch (final IOException ioe) {
+                                logger.error("Failed to swap in FlowFiles from Swap File {} in order to drop the FlowFiles for Connection {} due to {}",
+                                    swapLocation, StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
+                                logger.error("", ioe);
+
+                                dropRequest.setState(DropFlowFileState.FAILURE, "Failed to swap in FlowFiles from Swap File " + swapLocation + " due to " + ioe.toString());
+                                if (swappedIn != null) {
+                                    activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue.
+                                }
+
+                                return;
                             }
 
                             dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
@@ -963,8 +993,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
                         dropRequest.setState(DropFlowFileState.COMPLETE);
                     } catch (final Exception e) {
-                        // TODO: Handle adequately
-                        dropRequest.setState(DropFlowFileState.FAILURE);
+                        dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + e.toString());
                     }
                 } finally {
                     writeLock.unlock("Drop FlowFiles");