You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/11/23 17:13:18 UTC

nifi git commit: NIFI-3091: Ensure that we set the appropriate size on FlowFiles when modifying them

Repository: nifi
Updated Branches:
  refs/heads/master 91ff810db -> 7ff14f719


NIFI-3091: Ensure that we set the appropriate size on FlowFiles when modifying them

This closes #1267

Signed-off-by: jpercivall <JP...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7ff14f71
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7ff14f71
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7ff14f71

Branch: refs/heads/master
Commit: 7ff14f7191f8bbb1722340ccdd963d3e7d24b9e4
Parents: 91ff810
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Nov 23 11:09:42 2016 -0500
Committer: jpercivall <JP...@apache.org>
Committed: Wed Nov 23 12:07:44 2016 -0500

----------------------------------------------------------------------
 .../repository/StandardProcessSession.java      |  3 +-
 .../repository/TestStandardProcessSession.java  | 59 ++++++++++++++++++++
 2 files changed, 61 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7ff14f71/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 002bac9..80c917c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2580,7 +2580,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                     cnfeThrown = true;
                     throw cnfe;
                 } finally {
-                    this.bytesWritten += countingOut.getBytesWritten();
+                    writtenToFlowFile = countingOut.getBytesWritten();
+                    this.bytesWritten += writtenToFlowFile;
                     this.bytesRead += countingIn.getBytesRead();
                     recursionSet.remove(source);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7ff14f71/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 8cc088d..6f94994 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -277,6 +277,65 @@ public class TestStandardProcessSession {
     }
 
     @Test
+    public void testModifyContentWithStreamCallbackHasCorrectSize() throws IOException {
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+            .id(1000L)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
+        flowFileQueue.put(flowFileRecord);
+        FlowFile original = session.get();
+        assertNotNull(original);
+
+        FlowFile child = session.write(original, (in, out) -> out.write("hello".getBytes()));
+        session.transfer(child);
+        session.commit();
+
+        final FlowFileRecord onQueue = flowFileQueue.poll(Collections.emptySet());
+        assertEquals(5, onQueue.getSize());
+    }
+
+    @Test
+    public void testModifyContentWithOutputStreamCallbackHasCorrectSize() throws IOException {
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+            .id(1000L)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
+        flowFileQueue.put(flowFileRecord);
+        FlowFile original = session.get();
+        assertNotNull(original);
+
+        FlowFile child = session.write(original, out -> out.write("hello".getBytes()));
+        session.transfer(child);
+        session.commit();
+
+        final FlowFileRecord onQueue = flowFileQueue.poll(Collections.emptySet());
+        assertEquals(5, onQueue.getSize());
+    }
+
+    @Test
+    public void testModifyContentWithAppendHasCorrectSize() throws IOException {
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+            .id(1000L)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .build();
+        flowFileQueue.put(flowFileRecord);
+        FlowFile original = session.get();
+        assertNotNull(original);
+
+        FlowFile child = session.append(original, out -> out.write("hello".getBytes()));
+        session.transfer(child);
+        session.commit();
+
+        final FlowFileRecord onQueue = flowFileQueue.poll(Collections.emptySet());
+        assertEquals(5, onQueue.getSize());
+    }
+
+
+
+    @Test
     public void testModifyContentThenRollback() throws IOException {
         assertEquals(0, contentRepo.getExistingClaims().size());