You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/09/29 12:15:48 UTC

[GitHub] [nifi] theBestAndrew commented on a change in pull request #5418: NIFI-8727: Addressed bug in which ProcessSession doesn't properly dec…

theBestAndrew commented on a change in pull request #5418:
URL: https://github.com/apache/nifi/pull/5418#discussion_r718448390



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
##########
@@ -2603,6 +2603,205 @@ public void testFullRollbackAFterCheckpointDoesNotStoreState() throws IOExceptio
         stateManager.assertStateNotSet();
     }
 
+    @Test
+    public void testCloneThenRollbackCountsClaimReferencesProperly() throws IOException {
+        final ContentClaim originalClaim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(originalClaim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+            .contentClaim(originalClaim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.clone(flowFile);
+        session.rollback();
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+    }
+
+    @Test
+    public void testCloneThenWriteThenRollbackCountsClaimReferencesProperly() throws IOException {
+        final ContentClaim originalClaim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(originalClaim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+            .contentClaim(originalClaim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.clone(flowFile);
+        clone = session.write(flowFile, out -> out.write("Bye".getBytes()));
+        assertEquals(1, contentRepo.getClaimantCount(getContentClaim(clone)));
+
+        session.rollback();
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+        assertEquals(0, contentRepo.getClaimantCount(getContentClaim(clone)));
+    }
+
+    @Test
+    public void testCloneThenAppendThenRollbackCountsClaimReferencesProperly() throws IOException {
+        final ContentClaim originalClaim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(originalClaim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+            .contentClaim(originalClaim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.clone(flowFile);
+        clone = session.append(flowFile, out -> out.write("Bye".getBytes()));
+        assertEquals(1, contentRepo.getClaimantCount(getContentClaim(clone)));
+
+        session.rollback();
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+        assertEquals(0, contentRepo.getClaimantCount(getContentClaim(clone)));
+    }
+
+    @Test
+    public void testCloneThenWriteCountsClaimReferencesProperly() throws IOException {
+        final ContentClaim originalClaim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(originalClaim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+            .contentClaim(originalClaim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.clone(flowFile);
+
+        // Expect claimant count of 2 because the clone() means that the new FlowFile points to the same content claim.
+        assertEquals(2, contentRepo.getClaimantCount(originalClaim));
+
+        // Should be able to write to the FlowFile any number of times, and each time it should leave us with a Content Claim Claimant Count of 1 for the original (because the new FlowFile will no
+        // longer point at the original claim) and 1 for the new Content Claim.
+        for (int i=0; i < 10; i++) {
+            final ContentClaim previousCloneClaim = getContentClaim(clone);
+            clone = session.write(clone, out -> out.write("bye".getBytes()));
+
+            // After modifying the content of the FlowFile, the claimant count of the 'old' content claim should be 1, as should the claimant count of the updated content claim.
+            final ContentClaim updatedCloneClaim = getContentClaim(clone);
+            assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
+            assertEquals(1, contentRepo.getClaimantCount(originalClaim));
+            assertEquals(1, contentRepo.getClaimantCount(previousCloneClaim));
+        }
+    }
+
+    private ContentClaim getContentClaim(final FlowFile flowFile) {
+        return ((FlowFileRecord) flowFile).getContentClaim();
+    }
+
+    @Test
+    public void testCreateChildThenWriteCountsClaimReferencesProperly() throws IOException {
+        final ContentClaim claim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(claim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(claim));
+
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+            .contentClaim(claim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.create(flowFile);
+        assertEquals(1, contentRepo.getClaimantCount(claim));
+
+        clone = session.write(clone, out -> out.write("bye".getBytes()));
+
+        final ContentClaim updatedCloneClaim = getContentClaim(clone);
+        assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
+        assertEquals(1, contentRepo.getClaimantCount(claim));
+    }
+
+    @Test
+    public void testCreateChildThenMultipleWriteCountsClaimReferencesProperly() throws IOException {
+        final ContentClaim claim = contentRepo.create(false);
+        try (final OutputStream out = contentRepo.write(claim)) {
+            out.write("hello, world".getBytes());
+        }
+
+        assertEquals(1, contentRepo.getClaimantCount(claim));
+
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+            .contentClaim(claim)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .size(12L)
+            .build();
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile flowFile = session.get();
+
+        FlowFile clone = session.create(flowFile);
+        assertEquals(1, contentRepo.getClaimantCount(claim));
+
+        for (int i=0; i < 100; i++) {
+            clone = session.write(clone, out -> out.write("bye".getBytes()));
+
+            final ContentClaim updatedCloneClaim = getContentClaim(clone);
+            assertEquals(1, contentRepo.getClaimantCount(updatedCloneClaim));
+            assertEquals(1, contentRepo.getClaimantCount(claim));
+        }
+    }
+
+    @Test
+    public void testCreateNewFlowFileWithoutParentThenMultipleWritesCountsClaimReferencesProperly() {
+        FlowFile clone = session.create();

Review comment:
       My understanding is that this is a new FlowFile, not a clone of a previous one, so should the variable name be changed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org