You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2016/07/24 21:00:44 UTC

nifi git commit: NIFI-2376 This closes #713. Ensure that we don't decrement claimant count more than once when append() throws an Exception

Repository: nifi
Updated Branches:
  refs/heads/master 6932a53ec -> 4e08ea652


NIFI-2376 This closes #713. Ensure that we don't decrement claimant count more than once when append() throws an Exception


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4e08ea65
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4e08ea65
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4e08ea65

Branch: refs/heads/master
Commit: 4e08ea65254edd80d487ffc17b5046c37184467c
Parents: 6932a53
Author: Mark Payne <ma...@hotmail.com>
Authored: Sat Jul 23 15:05:20 2016 -0400
Committer: joewitt <jo...@apache.org>
Committed: Sun Jul 24 17:00:21 2016 -0400

----------------------------------------------------------------------
 .../nifi/controller/StandardFlowFileQueue.java  |   2 +-
 .../repository/FileSystemRepository.java        |   5 +-
 .../repository/StandardProcessSession.java      |  53 ++++++--
 .../repository/TestFileSystemRepository.java    |  49 ++++++++
 .../repository/TestStandardProcessSession.java  | 120 ++++++++++++++++++-
 5 files changed, 212 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4e08ea65/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index f391da5..27bbd69 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -79,7 +79,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  * processing. Must be thread safe.
  *
  */
-public final class StandardFlowFileQueue implements FlowFileQueue {
+public class StandardFlowFileQueue implements FlowFileQueue {
 
     public static final int MAX_EXPIRED_RECORDS_PER_ITERATION = 100000;
     public static final int SWAP_RECORD_POLL_SIZE = 10000;

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e08ea65/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 673440f..e350a90 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -964,7 +964,7 @@ public class FileSystemRepository implements ContentRepository {
                     final boolean enqueued = writableClaimQueue.offer(pair);
 
                     if (enqueued) {
-                        LOG.debug("Claim length less than max; Adding {} back to writableClaimStreams", this);
+                        LOG.debug("Claim length less than max; Leaving {} in writableClaimStreams map", this);
                     } else {
                         writableClaimStreams.remove(scc.getResourceClaim());
                         bcos.close();
@@ -1103,7 +1103,8 @@ public class FileSystemRepository implements ContentRepository {
         return Files.exists(getArchivePath(contentClaim.getResourceClaim()));
     }
 
-    private boolean archive(final ResourceClaim claim) throws IOException {
+    // visible for testing
+    boolean archive(final ResourceClaim claim) throws IOException {
         if (!archiveData) {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e08ea65/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 0a2f8c9..f6cb8a1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -330,13 +330,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                 if (record.isMarkedForDelete()) {
                     // if the working claim is not the same as the original claim, we can immediately destroy the working claim
                     // because it was created in this session and is to be deleted. We don't need to wait for the FlowFile Repo to sync.
-                    removeContent(record.getWorkingClaim());
+                    decrementClaimCount(record.getWorkingClaim());
 
                     if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getWorkingClaim())) {
                         // if working & original claim are same, don't remove twice; we only want to remove the original
                         // if it's different from the working. Otherwise, we remove two claimant counts. This causes
                         // an issue if we only updated the FlowFile attributes.
-                        removeContent(record.getOriginalClaim());
+                        decrementClaimCount(record.getOriginalClaim());
                     }
                     final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
                     final Connectable connectable = context.getConnectable();
@@ -344,7 +344,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                     LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[] {flowFile, terminator, flowFileLife});
                 } else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) {
                     // records which have been updated - remove original if exists
-                    removeContent(record.getOriginalClaim());
+                    decrementClaimCount(record.getOriginalClaim());
                 }
             }
 
@@ -923,12 +923,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         final Set<StandardRepositoryRecord> transferRecords = new HashSet<>();
         for (final StandardRepositoryRecord record : recordsToHandle) {
             if (record.isMarkedForAbort()) {
-                removeContent(record.getWorkingClaim());
+                decrementClaimCount(record.getWorkingClaim());
                 if (record.getCurrentClaim() != null && !record.getCurrentClaim().equals(record.getWorkingClaim())) {
                     // if working & original claim are same, don't remove twice; we only want to remove the original
                     // if it's different from the working. Otherwise, we remove two claimant counts. This causes
                     // an issue if we only updated the flowfile attributes.
-                    removeContent(record.getCurrentClaim());
+                    decrementClaimCount(record.getCurrentClaim());
                 }
                 abortedRecords.add(record);
             } else {
@@ -1020,7 +1020,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     }
 
 
-    private void removeContent(final ContentClaim claim) {
+    private void decrementClaimCount(final ContentClaim claim) {
         if (claim == null) {
             return;
         }
@@ -1733,7 +1733,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             record.markForDelete();
             expiredRecords.add(record);
             expiredReporter.expire(flowFile, "Expiration Threshold = " + connection.getFlowFileQueue().getFlowFileExpiration());
-            removeContent(flowFile.getContentClaim());
+            decrementClaimCount(flowFile.getContentClaim());
 
             final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
             final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
@@ -2198,9 +2198,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                 originalByteWrittenCount = outStream.getBytesWritten();
 
                 // wrap our OutputStreams so that the processor cannot close it
-                try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream)) {
+                try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream);
+                    final OutputStream flowFileAccessOutStream = new FlowFileAccessOutputStream(disableOnClose, source)) {
                     recursionSet.add(source);
-                    writer.process(disableOnClose);
+                    writer.process(flowFileAccessOutStream);
                 } finally {
                     recursionSet.remove(source);
                 }
@@ -2210,15 +2211,37 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             newSize = outStream.getBytesWritten();
         } catch (final ContentNotFoundException nfe) {
             resetWriteClaims(); // need to reset write claim before we can remove the claim
-            destroyContent(newClaim);
+
+            // If the content claim changed, then we should destroy the new one. We do this
+            // because the new content claim will never get set as the 'working claim' for the FlowFile
+            // record since we will throw an Exception. As a result, we need to ensure that we have
+            // appropriately decremented the claimant count and can destroy the content if it is no
+            // longer in use. However, it is critical that we do this ONLY if the content claim has
+            // changed. Otherwise, the FlowFile already has a reference to this Content Claim and
+            // whenever the FlowFile is removed, the claim count will be decremented; if we decremented
+            // it here also, we would be decrementing the claimant count twice!
+            if (newClaim != oldClaim) {
+                destroyContent(newClaim);
+            }
+
             handleContentNotFound(nfe, record);
         } catch (final IOException ioe) {
             resetWriteClaims(); // need to reset write claim before we can remove the claim
-            destroyContent(newClaim);
-            throw new FlowFileAccessException("Exception in callback: " + ioe.toString(), ioe);
+
+            // See above explanation for why this is done only if newClaim != oldClaim
+            if (newClaim != oldClaim) {
+                destroyContent(newClaim);
+            }
+
+            throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ioe.toString(), ioe);
         } catch (final Throwable t) {
             resetWriteClaims(); // need to reset write claim before we can remove the claim
-            destroyContent(newClaim);
+
+            // See above explanation for why this is done only if newClaim != oldClaim
+            if (newClaim != oldClaim) {
+                destroyContent(newClaim);
+            }
+
             throw t;
         } finally {
             if (outStream != null) {
@@ -2227,6 +2250,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             }
         }
 
+        // If the record already has a working claim, and this is the first time that we are appending to the FlowFile,
+        // destroy the current working claim because it is a temporary claim that
+        // is no longer going to be used, as we are about to set a new working claim. This would happen, for instance, if
+        // the FlowFile was written to, via #write() and then append() was called.
         if (newClaim != oldClaim) {
             removeTemporaryClaim(record);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e08ea65/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index c00b91a..b36a5e6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -495,6 +495,55 @@ public class TestFileSystemRepository {
     }
 
 
+    @Test
+    public void testWriteCannotProvideNullOutput() throws IOException {
+        FileSystemRepository repository = null;
+        try {
+            final List<Path> archivedPathsWithOpenStream = Collections.synchronizedList(new ArrayList<Path>());
+
+            // We are creating our own 'local' repository in this test so shut down the one created in the setup() method
+            shutdown();
+
+            repository = new FileSystemRepository() {
+                @Override
+                protected boolean archive(Path curPath) throws IOException {
+                    if (getOpenStreamCount() > 0) {
+                        archivedPathsWithOpenStream.add(curPath);
+                    }
+
+                    return true;
+                }
+            };
+
+            final StandardResourceClaimManager claimManager = new StandardResourceClaimManager();
+            repository.initialize(claimManager);
+            repository.purge();
+
+            final ContentClaim claim = repository.create(false);
+
+            assertEquals(1, claimManager.getClaimantCount(claim.getResourceClaim()));
+
+            int claimantCount = claimManager.decrementClaimantCount(claim.getResourceClaim());
+            assertEquals(0, claimantCount);
+            assertTrue(archivedPathsWithOpenStream.isEmpty());
+
+            OutputStream out = repository.write(claim);
+            out.close();
+            repository.decrementClaimantCount(claim);
+
+            ContentClaim claim2 = repository.create(false);
+            assertEquals(claim.getResourceClaim(), claim2.getResourceClaim());
+            out = repository.write(claim2);
+
+            final boolean archived = repository.archive(claim.getResourceClaim());
+            assertFalse(archived);
+        } finally {
+            if (repository != null) {
+                repository.shutdown();
+            }
+        }
+    }
+
     /**
      * We have encountered a situation where the File System Repo is moving files to archive and then eventually
      * aging them off while there is still an open file handle. This test is meant to replicate the conditions under

http://git-wip-us.apache.org/repos/asf/nifi/blob/4e08ea65/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index c1bcacd..2f3bff5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -68,6 +68,7 @@ import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.apache.nifi.processor.exception.MissingFlowFileException;
@@ -144,7 +145,8 @@ public class TestStandardProcessSession {
         final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class);
 
         final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class);
-        flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000);
+        final StandardFlowFileQueue actualQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000);
+        flowFileQueue = Mockito.spy(actualQueue);
         when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
 
         Mockito.doAnswer(new Answer<Object>() {
@@ -207,6 +209,71 @@ public class TestStandardProcessSession {
     }
 
     @Test
+    public void testAppendToChildThrowsIOExceptionThenRemove() throws IOException {
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+            .id(1000L)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
+        flowFileQueue.put(flowFileRecord);
+        FlowFile original = session.get();
+        assertNotNull(original);
+
+        FlowFile child = session.create(original);
+        child = session.append(child, out -> out.write("hello".getBytes()));
+
+        // Force an IOException. This will decrement out claim count for the resource claim.
+        try {
+            child = session.append(child, out -> {
+                throw new IOException();
+            });
+            Assert.fail("append() callback threw IOException but it was not wrapped in ProcessException");
+        } catch (final ProcessException pe) {
+            // expected
+        }
+
+        session.remove(child);
+        session.transfer(original);
+        session.commit();
+
+        final int numClaims = contentRepo.getExistingClaims().size();
+        assertEquals(0, numClaims);
+    }
+
+    @Test
+    public void testWriteForChildThrowsIOExceptionThenRemove() throws IOException {
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+            .id(1000L)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
+        flowFileQueue.put(flowFileRecord);
+        FlowFile original = session.get();
+        assertNotNull(original);
+
+        FlowFile child = session.create(original);
+        // Force an IOException. This will decrement out claim count for the resource claim.
+        try {
+            child = session.write(child, out -> out.write("hello".getBytes()));
+
+            child = session.write(child, out -> {
+                throw new IOException();
+            });
+            Assert.fail("write() callback threw IOException but it was not wrapped in ProcessException");
+        } catch (final ProcessException pe) {
+            // expected
+        }
+
+        session.remove(child);
+        session.transfer(original);
+        session.commit();
+
+        final int numClaims = contentRepo.getExistingClaims().size();
+        assertEquals(0, numClaims);
+    }
+
+
+    @Test
     public void testModifyContentThenRollback() throws IOException {
         assertEquals(0, contentRepo.getExistingClaims().size());
 
@@ -807,6 +874,57 @@ public class TestStandardProcessSession {
     }
 
     @Test
+    public void testAppendDoesNotDecrementContentClaimIfNotNeeded() {
+        FlowFile flowFile = session.create();
+
+        session.append(flowFile, new OutputStreamCallback() {
+            @Override
+            public void process(OutputStream out) throws IOException {
+                out.write("hello".getBytes());
+            }
+        });
+
+        final Set<ContentClaim> existingClaims = contentRepo.getExistingClaims();
+        assertEquals(1, existingClaims.size());
+        final ContentClaim claim = existingClaims.iterator().next();
+
+        final int countAfterAppend = contentRepo.getClaimantCount(claim);
+        assertEquals(1, countAfterAppend);
+    }
+
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testExpireDecrementsClaimsOnce() throws IOException {
+        final ContentClaim contentClaim = contentRepo.create(false);
+
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .contentClaim(contentClaim)
+            .build();
+
+        Mockito.doAnswer(new Answer<List<FlowFileRecord>>() {
+            int iterations = 0;
+
+            @Override
+            public List<FlowFileRecord> answer(InvocationOnMock invocation) throws Throwable {
+                if (iterations++ == 0) {
+                    final Set<FlowFileRecord> expired = invocation.getArgumentAt(1, Set.class);
+                    expired.add(flowFileRecord);
+                }
+
+                return null;
+            }
+        }).when(flowFileQueue).poll(Mockito.any(FlowFileFilter.class), Mockito.any(Set.class));
+
+        session.expireFlowFiles();
+        session.commit(); // if the content claim count is decremented to less than 0, an exception will be thrown.
+
+        assertEquals(1L, contentRepo.getClaimsRemoved());
+    }
+
+    @Test
     public void testManyFilesOpened() throws IOException {
 
         StandardProcessSession[] standardProcessSessions = new StandardProcessSession[100000];