You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2019/11/06 14:53:36 UTC

[nifi] branch master updated: NIFI-6846: If an Exception is thrown while a Processor is writing to a FlowFile, but that Content Claim is not yet eligible for destruction, mark it as a transient claim on the RepositoryRecord so that if it's available when the FlowFile Repository is checkpointed, then it will be cleaned up then

This is an automated email from the ASF dual-hosted git repository.

mcgilman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 49b7a7c  NIFI-6846: If an Exception is thrown while a Processor is writing to a FlowFile, but that Content Claim is not yet eligible for destruction, mark it as a transient claim on the RepositoryRecord so that if it's available when the FlowFile Repository is checkpointed, then it will be cleaned up then
49b7a7c is described below

commit 49b7a7cd6b2645188a732bc57122d1757516e2bd
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Nov 5 13:36:45 2019 -0500

    NIFI-6846: If an Exception is thrown while a Processor is writing to a FlowFile, but that Content Claim is not yet eligible for destruction, mark it as a transient claim on the RepositoryRecord so that if it's available when the FlowFile Repository is checkpointed, then it will be cleaned up then
    
    This closes #3872
---
 .../repository/StandardProcessSession.java         | 51 ++++++++++++----------
 .../repository/WriteAheadFlowFileRepository.java   |  3 ++
 2 files changed, 32 insertions(+), 22 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 5684ab6..f7249c0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1116,15 +1116,22 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
      *
      * @param claim claim to destroy
      */
-    private void destroyContent(final ContentClaim claim) {
+    private void destroyContent(final ContentClaim claim, final StandardRepositoryRecord repoRecord) {
         if (claim == null) {
             return;
         }
 
         final int decrementedClaimCount = context.getContentRepository().decrementClaimantCount(claim);
+        boolean removed = false;
         if (decrementedClaimCount <= 0) {
             resetWriteClaims(); // Have to ensure that we are not currently writing to the claim before we can destroy it.
-            context.getContentRepository().remove(claim);
+            removed = context.getContentRepository().remove(claim);
+        }
+
+        // If we were not able to remove the content claim yet, mark it as a transient claim so that it will be cleaned up when the
+        // FlowFile Repository is updated if it's available for cleanup at that time.
+        if (!removed) {
+            repoRecord.addTransientClaim(claim);
         }
     }
 
@@ -2554,14 +2561,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                 bytesRead += readCount;
             }
         } catch (final ContentNotFoundException nfe) {
-            destroyContent(newClaim);
+            destroyContent(newClaim, destinationRecord);
             handleContentNotFound(nfe, destinationRecord);
             handleContentNotFound(nfe, sourceRecords);
         } catch (final IOException ioe) {
-            destroyContent(newClaim);
+            destroyContent(newClaim, destinationRecord);
             throw new FlowFileAccessException("Failed to merge " + sources.size() + " into " + destination + " due to " + ioe.toString(), ioe);
         } catch (final Throwable t) {
-            destroyContent(newClaim);
+            destroyContent(newClaim, destinationRecord);
             throw t;
         }
 
@@ -2690,20 +2697,20 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             return createTaskTerminationStream(errorHandlingOutputStream);
         } catch (final ContentNotFoundException nfe) {
             resetWriteClaims(); // need to reset write claim before we can remove the claim
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             handleContentNotFound(nfe, record);
             throw nfe;
         } catch (final FlowFileAccessException ffae) {
             resetWriteClaims(); // need to reset write claim before we can remove the claim
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             throw ffae;
         } catch (final IOException ioe) {
             resetWriteClaims(); // need to reset write claim before we can remove the claim
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ioe.toString(), ioe);
         } catch (final Throwable t) {
             resetWriteClaims(); // need to reset write claim before we can remove the claim
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             throw t;
         }
     }
@@ -2737,19 +2744,19 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             }
         } catch (final ContentNotFoundException nfe) {
             resetWriteClaims(); // need to reset write claim before we can remove the claim
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             handleContentNotFound(nfe, record);
         } catch (final FlowFileAccessException ffae) {
             resetWriteClaims(); // need to reset write claim before we can remove the claim
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             throw ffae;
         } catch (final IOException ioe) {
             resetWriteClaims(); // need to reset write claim before we can remove the claim
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ioe.toString(), ioe);
         } catch (final Throwable t) {
             resetWriteClaims(); // need to reset write claim before we can remove the claim
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             throw t;
         }
 
@@ -2835,7 +2842,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             // whenever the FlowFile is removed, the claim count will be decremented; if we decremented
             // it here also, we would be decrementing the claimant count twice!
             if (newClaim != oldClaim) {
-                destroyContent(newClaim);
+                destroyContent(newClaim, record);
             }
 
             handleContentNotFound(nfe, record);
@@ -2844,7 +2851,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
             // See above explanation for why this is done only if newClaim != oldClaim
             if (newClaim != oldClaim) {
-                destroyContent(newClaim);
+                destroyContent(newClaim, record);
             }
 
             throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ioe.toString(), ioe);
@@ -2853,7 +2860,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
             // See above explanation for why this is done only if newClaim != oldClaim
             if (newClaim != oldClaim) {
-                destroyContent(newClaim);
+                destroyContent(newClaim, record);
             }
 
             throw t;
@@ -3004,16 +3011,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                 }
             }
         } catch (final ContentNotFoundException nfe) {
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             handleContentNotFound(nfe, record);
         } catch (final IOException ioe) {
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ioe.toString(), ioe);
         } catch (final FlowFileAccessException ffae) {
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             throw ffae;
         } catch (final Throwable t) {
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             throw t;
         }
 
@@ -3060,7 +3067,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             bytesWritten += newSize;
             bytesRead += newSize;
         } catch (final Throwable t) {
-            destroyContent(newClaim);
+            destroyContent(newClaim, record);
             throw new FlowFileAccessException("Failed to import data from " + source + " for " + destination + " due to " + t.toString(), t);
         }
 
@@ -3104,7 +3111,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             }
         } catch (final Throwable t) {
             if (newClaim != null) {
-                destroyContent(newClaim);
+                destroyContent(newClaim, record);
             }
 
             throw new FlowFileAccessException("Failed to import data from " + source + " for " + destination + " due to " + t.toString(), t);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 2ba59c7..dcd0f32 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -511,7 +511,10 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
                 swapLocationsRemoved.add(swapLocation);
                 swapLocationsAdded.remove(swapLocation);
             }
+        }
 
+        // Once the content claim counts have been updated for all records, collect any transient claims that are eligible for destruction
+        for (final RepositoryRecord record : repositoryRecords) {
             final List<ContentClaim> transientClaims = record.getTransientClaims();
             if (transientClaims != null) {
                 for (final ContentClaim transientClaim : transientClaims) {