You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2016/07/24 21:00:44 UTC
nifi git commit: NIFI-2376 This closes #713. Ensure that we don't
decrement claimant count more than once when append() throws an Exception
Repository: nifi
Updated Branches:
refs/heads/master 6932a53ec -> 4e08ea652
NIFI-2376 This closes #713. Ensure that we don't decrement claimant count more than once when append() throws an Exception
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4e08ea65
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4e08ea65
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4e08ea65
Branch: refs/heads/master
Commit: 4e08ea65254edd80d487ffc17b5046c37184467c
Parents: 6932a53
Author: Mark Payne <ma...@hotmail.com>
Authored: Sat Jul 23 15:05:20 2016 -0400
Committer: joewitt <jo...@apache.org>
Committed: Sun Jul 24 17:00:21 2016 -0400
----------------------------------------------------------------------
.../nifi/controller/StandardFlowFileQueue.java | 2 +-
.../repository/FileSystemRepository.java | 5 +-
.../repository/StandardProcessSession.java | 53 ++++++--
.../repository/TestFileSystemRepository.java | 49 ++++++++
.../repository/TestStandardProcessSession.java | 120 ++++++++++++++++++-
5 files changed, 212 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/4e08ea65/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index f391da5..27bbd69 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -79,7 +79,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* processing. Must be thread safe.
*
*/
-public final class StandardFlowFileQueue implements FlowFileQueue {
+public class StandardFlowFileQueue implements FlowFileQueue {
public static final int MAX_EXPIRED_RECORDS_PER_ITERATION = 100000;
public static final int SWAP_RECORD_POLL_SIZE = 10000;
http://git-wip-us.apache.org/repos/asf/nifi/blob/4e08ea65/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 673440f..e350a90 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -964,7 +964,7 @@ public class FileSystemRepository implements ContentRepository {
final boolean enqueued = writableClaimQueue.offer(pair);
if (enqueued) {
- LOG.debug("Claim length less than max; Adding {} back to writableClaimStreams", this);
+ LOG.debug("Claim length less than max; Leaving {} in writableClaimStreams map", this);
} else {
writableClaimStreams.remove(scc.getResourceClaim());
bcos.close();
@@ -1103,7 +1103,8 @@ public class FileSystemRepository implements ContentRepository {
return Files.exists(getArchivePath(contentClaim.getResourceClaim()));
}
- private boolean archive(final ResourceClaim claim) throws IOException {
+ // visible for testing
+ boolean archive(final ResourceClaim claim) throws IOException {
if (!archiveData) {
return false;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/4e08ea65/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 0a2f8c9..f6cb8a1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -330,13 +330,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (record.isMarkedForDelete()) {
// if the working claim is not the same as the original claim, we can immediately destroy the working claim
// because it was created in this session and is to be deleted. We don't need to wait for the FlowFile Repo to sync.
- removeContent(record.getWorkingClaim());
+ decrementClaimCount(record.getWorkingClaim());
if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getWorkingClaim())) {
// if working & original claim are same, don't remove twice; we only want to remove the original
// if it's different from the working. Otherwise, we remove two claimant counts. This causes
// an issue if we only updated the FlowFile attributes.
- removeContent(record.getOriginalClaim());
+ decrementClaimCount(record.getOriginalClaim());
}
final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
final Connectable connectable = context.getConnectable();
@@ -344,7 +344,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[] {flowFile, terminator, flowFileLife});
} else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) {
// records which have been updated - remove original if exists
- removeContent(record.getOriginalClaim());
+ decrementClaimCount(record.getOriginalClaim());
}
}
@@ -923,12 +923,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final Set<StandardRepositoryRecord> transferRecords = new HashSet<>();
for (final StandardRepositoryRecord record : recordsToHandle) {
if (record.isMarkedForAbort()) {
- removeContent(record.getWorkingClaim());
+ decrementClaimCount(record.getWorkingClaim());
if (record.getCurrentClaim() != null && !record.getCurrentClaim().equals(record.getWorkingClaim())) {
// if working & original claim are same, don't remove twice; we only want to remove the original
// if it's different from the working. Otherwise, we remove two claimant counts. This causes
// an issue if we only updated the flowfile attributes.
- removeContent(record.getCurrentClaim());
+ decrementClaimCount(record.getCurrentClaim());
}
abortedRecords.add(record);
} else {
@@ -1020,7 +1020,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
- private void removeContent(final ContentClaim claim) {
+ private void decrementClaimCount(final ContentClaim claim) {
if (claim == null) {
return;
}
@@ -1733,7 +1733,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
record.markForDelete();
expiredRecords.add(record);
expiredReporter.expire(flowFile, "Expiration Threshold = " + connection.getFlowFileQueue().getFlowFileExpiration());
- removeContent(flowFile.getContentClaim());
+ decrementClaimCount(flowFile.getContentClaim());
final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
@@ -2198,9 +2198,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
originalByteWrittenCount = outStream.getBytesWritten();
// wrap our OutputStreams so that the processor cannot close it
- try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream)) {
+ try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream);
+ final OutputStream flowFileAccessOutStream = new FlowFileAccessOutputStream(disableOnClose, source)) {
recursionSet.add(source);
- writer.process(disableOnClose);
+ writer.process(flowFileAccessOutStream);
} finally {
recursionSet.remove(source);
}
@@ -2210,15 +2211,37 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
newSize = outStream.getBytesWritten();
} catch (final ContentNotFoundException nfe) {
resetWriteClaims(); // need to reset write claim before we can remove the claim
- destroyContent(newClaim);
+
+ // If the content claim changed, then we should destroy the new one. We do this
+ // because the new content claim will never get set as the 'working claim' for the FlowFile
+ // record since we will throw an Exception. As a result, we need to ensure that we have
+ // appropriately decremented the claimant count and can destroy the content if it is no
+ // longer in use. However, it is critical that we do this ONLY if the content claim has
+ // changed. Otherwise, the FlowFile already has a reference to this Content Claim and
+ // whenever the FlowFile is removed, the claim count will be decremented; if we decremented
+ // it here also, we would be decrementing the claimant count twice!
+ if (newClaim != oldClaim) {
+ destroyContent(newClaim);
+ }
+
handleContentNotFound(nfe, record);
} catch (final IOException ioe) {
resetWriteClaims(); // need to reset write claim before we can remove the claim
- destroyContent(newClaim);
- throw new FlowFileAccessException("Exception in callback: " + ioe.toString(), ioe);
+
+ // See above explanation for why this is done only if newClaim != oldClaim
+ if (newClaim != oldClaim) {
+ destroyContent(newClaim);
+ }
+
+ throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ioe.toString(), ioe);
} catch (final Throwable t) {
resetWriteClaims(); // need to reset write claim before we can remove the claim
- destroyContent(newClaim);
+
+ // See above explanation for why this is done only if newClaim != oldClaim
+ if (newClaim != oldClaim) {
+ destroyContent(newClaim);
+ }
+
throw t;
} finally {
if (outStream != null) {
@@ -2227,6 +2250,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
}
+ // If the record already has a working claim, and this is the first time that we are appending to the FlowFile,
+ // destroy the current working claim because it is a temporary claim that
+ // is no longer going to be used, as we are about to set a new working claim. This would happen, for instance, if
+ // the FlowFile was written to, via #write() and then append() was called.
if (newClaim != oldClaim) {
removeTemporaryClaim(record);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/4e08ea65/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index c00b91a..b36a5e6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -495,6 +495,55 @@ public class TestFileSystemRepository {
}
+ @Test
+ public void testWriteCannotProvideNullOutput() throws IOException {
+ FileSystemRepository repository = null;
+ try {
+ final List<Path> archivedPathsWithOpenStream = Collections.synchronizedList(new ArrayList<Path>());
+
+ // We are creating our own 'local' repository in this test so shut down the one created in the setup() method
+ shutdown();
+
+ repository = new FileSystemRepository() {
+ @Override
+ protected boolean archive(Path curPath) throws IOException {
+ if (getOpenStreamCount() > 0) {
+ archivedPathsWithOpenStream.add(curPath);
+ }
+
+ return true;
+ }
+ };
+
+ final StandardResourceClaimManager claimManager = new StandardResourceClaimManager();
+ repository.initialize(claimManager);
+ repository.purge();
+
+ final ContentClaim claim = repository.create(false);
+
+ assertEquals(1, claimManager.getClaimantCount(claim.getResourceClaim()));
+
+ int claimantCount = claimManager.decrementClaimantCount(claim.getResourceClaim());
+ assertEquals(0, claimantCount);
+ assertTrue(archivedPathsWithOpenStream.isEmpty());
+
+ OutputStream out = repository.write(claim);
+ out.close();
+ repository.decrementClaimantCount(claim);
+
+ ContentClaim claim2 = repository.create(false);
+ assertEquals(claim.getResourceClaim(), claim2.getResourceClaim());
+ out = repository.write(claim2);
+
+ final boolean archived = repository.archive(claim.getResourceClaim());
+ assertFalse(archived);
+ } finally {
+ if (repository != null) {
+ repository.shutdown();
+ }
+ }
+ }
+
/**
* We have encountered a situation where the File System Repo is moving files to archive and then eventually
* aging them off while there is still an open file handle. This test is meant to replicate the conditions under
http://git-wip-us.apache.org/repos/asf/nifi/blob/4e08ea65/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index c1bcacd..2f3bff5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -68,6 +68,7 @@ import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.MissingFlowFileException;
@@ -144,7 +145,8 @@ public class TestStandardProcessSession {
final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class);
final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class);
- flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000);
+ final StandardFlowFileQueue actualQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000);
+ flowFileQueue = Mockito.spy(actualQueue);
when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
Mockito.doAnswer(new Answer<Object>() {
@@ -207,6 +209,71 @@ public class TestStandardProcessSession {
}
@Test
+ public void testAppendToChildThrowsIOExceptionThenRemove() throws IOException {
+ final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+ .id(1000L)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .build();
+ flowFileQueue.put(flowFileRecord);
+ FlowFile original = session.get();
+ assertNotNull(original);
+
+ FlowFile child = session.create(original);
+ child = session.append(child, out -> out.write("hello".getBytes()));
+
+ // Force an IOException. This will decrement out claim count for the resource claim.
+ try {
+ child = session.append(child, out -> {
+ throw new IOException();
+ });
+ Assert.fail("append() callback threw IOException but it was not wrapped in ProcessException");
+ } catch (final ProcessException pe) {
+ // expected
+ }
+
+ session.remove(child);
+ session.transfer(original);
+ session.commit();
+
+ final int numClaims = contentRepo.getExistingClaims().size();
+ assertEquals(0, numClaims);
+ }
+
+ @Test
+ public void testWriteForChildThrowsIOExceptionThenRemove() throws IOException {
+ final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+ .id(1000L)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .build();
+ flowFileQueue.put(flowFileRecord);
+ FlowFile original = session.get();
+ assertNotNull(original);
+
+ FlowFile child = session.create(original);
+ // Force an IOException. This will decrement out claim count for the resource claim.
+ try {
+ child = session.write(child, out -> out.write("hello".getBytes()));
+
+ child = session.write(child, out -> {
+ throw new IOException();
+ });
+ Assert.fail("write() callback threw IOException but it was not wrapped in ProcessException");
+ } catch (final ProcessException pe) {
+ // expected
+ }
+
+ session.remove(child);
+ session.transfer(original);
+ session.commit();
+
+ final int numClaims = contentRepo.getExistingClaims().size();
+ assertEquals(0, numClaims);
+ }
+
+
+ @Test
public void testModifyContentThenRollback() throws IOException {
assertEquals(0, contentRepo.getExistingClaims().size());
@@ -807,6 +874,57 @@ public class TestStandardProcessSession {
}
@Test
+ public void testAppendDoesNotDecrementContentClaimIfNotNeeded() {
+ FlowFile flowFile = session.create();
+
+ session.append(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out) throws IOException {
+ out.write("hello".getBytes());
+ }
+ });
+
+ final Set<ContentClaim> existingClaims = contentRepo.getExistingClaims();
+ assertEquals(1, existingClaims.size());
+ final ContentClaim claim = existingClaims.iterator().next();
+
+ final int countAfterAppend = contentRepo.getClaimantCount(claim);
+ assertEquals(1, countAfterAppend);
+ }
+
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testExpireDecrementsClaimsOnce() throws IOException {
+ final ContentClaim contentClaim = contentRepo.create(false);
+
+ final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .contentClaim(contentClaim)
+ .build();
+
+ Mockito.doAnswer(new Answer<List<FlowFileRecord>>() {
+ int iterations = 0;
+
+ @Override
+ public List<FlowFileRecord> answer(InvocationOnMock invocation) throws Throwable {
+ if (iterations++ == 0) {
+ final Set<FlowFileRecord> expired = invocation.getArgumentAt(1, Set.class);
+ expired.add(flowFileRecord);
+ }
+
+ return null;
+ }
+ }).when(flowFileQueue).poll(Mockito.any(FlowFileFilter.class), Mockito.any(Set.class));
+
+ session.expireFlowFiles();
+ session.commit(); // if the content claim count is decremented to less than 0, an exception will be thrown.
+
+ assertEquals(1L, contentRepo.getClaimsRemoved());
+ }
+
+ @Test
public void testManyFilesOpened() throws IOException {
StandardProcessSession[] standardProcessSessions = new StandardProcessSession[100000];