You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/11/23 17:13:18 UTC
nifi git commit: NIFI-3091: Ensure that we set the appropriate size
on FlowFiles when modifying them
Repository: nifi
Updated Branches:
refs/heads/master 91ff810db -> 7ff14f719
NIFI-3091: Ensure that we set the appropriate size on FlowFiles when modifying them
This closes #1267
Signed-off-by: jpercivall <JP...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7ff14f71
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7ff14f71
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7ff14f71
Branch: refs/heads/master
Commit: 7ff14f7191f8bbb1722340ccdd963d3e7d24b9e4
Parents: 91ff810
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Nov 23 11:09:42 2016 -0500
Committer: jpercivall <JP...@apache.org>
Committed: Wed Nov 23 12:07:44 2016 -0500
----------------------------------------------------------------------
.../repository/StandardProcessSession.java | 3 +-
.../repository/TestStandardProcessSession.java | 59 ++++++++++++++++++++
2 files changed, 61 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/7ff14f71/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 002bac9..80c917c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2580,7 +2580,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
cnfeThrown = true;
throw cnfe;
} finally {
- this.bytesWritten += countingOut.getBytesWritten();
+ writtenToFlowFile = countingOut.getBytesWritten();
+ this.bytesWritten += writtenToFlowFile;
this.bytesRead += countingIn.getBytesRead();
recursionSet.remove(source);
http://git-wip-us.apache.org/repos/asf/nifi/blob/7ff14f71/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 8cc088d..6f94994 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -277,6 +277,65 @@ public class TestStandardProcessSession {
}
@Test
+ public void testModifyContentWithStreamCallbackHasCorrectSize() throws IOException {
+ final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+ .id(1000L)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .build();
+ flowFileQueue.put(flowFileRecord);
+ FlowFile original = session.get();
+ assertNotNull(original);
+
+ FlowFile child = session.write(original, (in, out) -> out.write("hello".getBytes()));
+ session.transfer(child);
+ session.commit();
+
+ final FlowFileRecord onQueue = flowFileQueue.poll(Collections.emptySet());
+ assertEquals(5, onQueue.getSize());
+ }
+
+ @Test
+ public void testModifyContentWithOutputStreamCallbackHasCorrectSize() throws IOException {
+ final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+ .id(1000L)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .build();
+ flowFileQueue.put(flowFileRecord);
+ FlowFile original = session.get();
+ assertNotNull(original);
+
+ FlowFile child = session.write(original, out -> out.write("hello".getBytes()));
+ session.transfer(child);
+ session.commit();
+
+ final FlowFileRecord onQueue = flowFileQueue.poll(Collections.emptySet());
+ assertEquals(5, onQueue.getSize());
+ }
+
+ @Test
+ public void testModifyContentWithAppendHasCorrectSize() throws IOException {
+ final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+ .id(1000L)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .build();
+ flowFileQueue.put(flowFileRecord);
+ FlowFile original = session.get();
+ assertNotNull(original);
+
+ FlowFile child = session.append(original, out -> out.write("hello".getBytes()));
+ session.transfer(child);
+ session.commit();
+
+ final FlowFileRecord onQueue = flowFileQueue.poll(Collections.emptySet());
+ assertEquals(5, onQueue.getSize());
+ }
+
+
+
+ @Test
public void testModifyContentThenRollback() throws IOException {
assertEquals(0, contentRepo.getExistingClaims().size());