You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/03 16:36:40 UTC
[12/40] nifi git commit: NIFI-730: Added error messages if we fail to
drop FlowFiles from queue
NIFI-730: Added error messages if we fail to drop FlowFiles from queue
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/afb76afc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/afb76afc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/afb76afc
Branch: refs/heads/NIFI-274
Commit: afb76afcd0fd7d0c144a37621fdabc181bd42307
Parents: 72ff2a2
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Oct 13 15:57:18 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Oct 13 15:57:18 2015 -0400
----------------------------------------------------------------------
.../controller/queue/DropFlowFileStatus.java | 5 +++
.../nifi/controller/DropFlowFileRequest.java | 11 +++++
.../nifi/controller/StandardFlowFileQueue.java | 45 ++++++++++++++++----
3 files changed, 53 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/afb76afc/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
index 7d5b9c2..737fbe3 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
@@ -70,4 +70,9 @@ public interface DropFlowFileStatus {
* @return the current state of the operation
*/
DropFlowFileState getState();
+
+ /**
+ * @return the reason that the state is set to a Failure state, or <code>null</code> if the state is not {@link DropFlowFileState#FAILURE}.
+ */
+ String getFailureReason();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/afb76afc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
index 4104308..189fe7d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
@@ -30,6 +30,7 @@ public class DropFlowFileRequest implements DropFlowFileStatus {
private volatile QueueSize droppedSize = new QueueSize(0, 0L);
private volatile long lastUpdated = System.currentTimeMillis();
private volatile Thread executionThread;
+ private volatile String failureReason;
private DropFlowFileState state = DropFlowFileState.WAITING_FOR_LOCK;
@@ -85,8 +86,18 @@ public class DropFlowFileRequest implements DropFlowFileStatus {
return lastUpdated;
}
+ @Override
+ public String getFailureReason() {
+ return failureReason;
+ }
+
synchronized void setState(final DropFlowFileState state) {
+ setState(state, null);
+ }
+
+ synchronized void setState(final DropFlowFileState state, final String explanation) {
this.state = state;
+ this.failureReason = explanation;
this.lastUpdated = System.currentTimeMillis();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/afb76afc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 82c1c7e..5b137f7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -928,14 +928,34 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
try {
final List<FlowFileRecord> activeQueueRecords = new ArrayList<>(activeQueue);
- QueueSize droppedSize = drop(activeQueueRecords, requestor);
+
+ QueueSize droppedSize;
+ try {
+ droppedSize = drop(activeQueueRecords, requestor);
+ } catch (final IOException ioe) {
+ logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
+ logger.error("", ioe);
+
+ dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString());
+ return;
+ }
+
activeQueue.clear();
activeQueueContentSize = 0;
activeQueueSizeRef.set(0);
dropRequest.setCurrentSize(getQueueSize());
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
- droppedSize = drop(swapQueue, requestor);
+ try {
+ droppedSize = drop(swapQueue, requestor);
+ } catch (final IOException ioe) {
+ logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
+ logger.error("", ioe);
+
+ dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString());
+ return;
+ }
+
swapQueue.clear();
dropRequest.setCurrentSize(getQueueSize());
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
@@ -946,12 +966,22 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final Iterator<String> swapLocationItr = swapLocations.iterator();
while (swapLocationItr.hasNext()) {
final String swapLocation = swapLocationItr.next();
- final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
+
+ List<FlowFileRecord> swappedIn = null;
try {
+ swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
droppedSize = drop(swappedIn, requestor);
- } catch (final Exception e) {
- activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue.
- throw e;
+ } catch (final IOException ioe) {
+ logger.error("Failed to swap in FlowFiles from Swap File {} in order to drop the FlowFiles for Connection {} due to {}",
+ swapLocation, StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
+ logger.error("", ioe);
+
+ dropRequest.setState(DropFlowFileState.FAILURE, "Failed to swap in FlowFiles from Swap File " + swapLocation + " due to " + ioe.toString());
+ if (swappedIn != null) {
+ activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue.
+ }
+
+ return;
}
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
@@ -963,8 +993,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
dropRequest.setState(DropFlowFileState.COMPLETE);
} catch (final Exception e) {
- // TODO: Handle adequately
- dropRequest.setState(DropFlowFileState.FAILURE);
+ dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + e.toString());
}
} finally {
writeLock.unlock("Drop FlowFiles");