You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2019/11/05 18:06:45 UTC

[nifi] branch master updated: NIFI-6841: Fixed bug that resulted in the wrong number of 'Bytes Read' being reported by ByteCountingInputStream in the event that #skip was called between calls to #mark and #reset

This is an automated email from the ASF dual-hosted git repository.

mcgilman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new a9db5a8  NIFI-6841: Fixed bug that resulted in the wrong number of 'Bytes Read' being reported by ByteCountingInputStream in the event that #skip was called between calls to #mark and #reset
a9db5a8 is described below

commit a9db5a8cb7313005b4077b66ce10ef81d3055ee8
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Nov 5 09:38:01 2019 -0500

    NIFI-6841: Fixed bug that resulted in the wrong number of 'Bytes Read' being reported by ByteCountingInputStream in the event that #skip was called between calls to #mark and #reset
    
    This closes #3868
---
 .../nifi/stream/io/ByteCountingInputStream.java    | 19 ++++---
 .../repository/TestStandardProcessSession.java     | 66 +++++++++++++++++++++-
 2 files changed, 77 insertions(+), 8 deletions(-)

diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
index 4c2372c..1d4ac0f 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
@@ -25,7 +25,8 @@ public class ByteCountingInputStream extends InputStream {
     private long bytesRead = 0L;
     private long bytesSkipped = 0L;
 
-    private long bytesSinceMark = 0L;
+    private long bytesReadSinceMark = 0L;
+    private long bytesSkippedSinceMark = 0L;
 
     public ByteCountingInputStream(final InputStream in) {
         this.in = in;
@@ -41,7 +42,7 @@ public class ByteCountingInputStream extends InputStream {
         final int fromSuper = in.read();
         if (fromSuper >= 0) {
             bytesRead++;
-            bytesSinceMark++;
+            bytesReadSinceMark++;
         }
         return fromSuper;
     }
@@ -51,7 +52,7 @@ public class ByteCountingInputStream extends InputStream {
         final int fromSuper = in.read(b, off, len);
         if (fromSuper >= 0) {
             bytesRead += fromSuper;
-            bytesSinceMark += fromSuper;
+            bytesReadSinceMark += fromSuper;
         }
 
         return fromSuper;
@@ -67,7 +68,7 @@ public class ByteCountingInputStream extends InputStream {
         final long skipped = in.skip(n);
         if (skipped >= 0) {
             bytesSkipped += skipped;
-            bytesSinceMark += skipped;
+            bytesSkippedSinceMark += skipped;
         }
         return skipped;
     }
@@ -88,7 +89,8 @@ public class ByteCountingInputStream extends InputStream {
     public void mark(final int readlimit) {
         in.mark(readlimit);
 
-        bytesSinceMark = 0L;
+        bytesReadSinceMark = 0L;
+        bytesSkippedSinceMark = 0L;
     }
 
     @Override
@@ -99,8 +101,11 @@ public class ByteCountingInputStream extends InputStream {
     @Override
     public void reset() throws IOException {
         in.reset();
-        bytesRead -= bytesSinceMark;
-        bytesSinceMark = 0L;
+        bytesRead -= bytesReadSinceMark;
+        bytesSkipped -= bytesSkippedSinceMark;
+
+        bytesReadSinceMark = 0L;
+        bytesSkippedSinceMark = 0L;
     }
 
     @Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 267f22d..e347c15 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -29,6 +29,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.controller.repository.claim.StandardContentClaim;
 import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.groups.ProcessGroup;
@@ -114,6 +115,7 @@ public class TestStandardProcessSession {
     private ProvenanceEventRepository provenanceRepo;
     private MockFlowFileRepository flowFileRepo;
     private CounterRepository counterRepository;
+    private FlowFileEventRepository flowFileEventRepo;
     private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build();
     private static StandardResourceClaimManager resourceClaimManager;
 
@@ -155,7 +157,7 @@ public class TestStandardProcessSession {
         resourceClaimManager = new StandardResourceClaimManager();
 
         System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestStandardProcessSession.class.getResource("/conf/nifi.properties").getFile());
-        final FlowFileEventRepository flowFileEventRepo = Mockito.mock(FlowFileEventRepository.class);
+        flowFileEventRepo = new RingBufferEventRepository(1);
         counterRepository = new StandardCounterRepository();
         provenanceRepo = new MockProvenanceRepository();
 
@@ -341,6 +343,68 @@ public class TestStandardProcessSession {
     }
 
     @Test
+    public void testReadCountCorrectWhenSkippingWithReadCallback() throws IOException {
+        final byte[] content = "This and that and the other.".getBytes(StandardCharsets.UTF_8);
+
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+            .id(1000L)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .contentClaim(contentRepo.create(content))
+            .size(content.length)
+            .build();
+
+        flowFileQueue.put(flowFileRecord);
+
+        FlowFile flowFile = session.get();
+        session.read(flowFile, in -> {
+            assertEquals('T', (char) in.read());
+            in.mark(10);
+            assertEquals(5, in.skip(5L));
+            assertEquals('n', (char) in.read());
+            in.reset();
+        });
+
+        session.transfer(flowFile);
+        session.commit();
+
+        final RepositoryStatusReport report = flowFileEventRepo.reportTransferEvents(0L);
+        final long bytesRead = report.getReportEntry("connectable-1").getBytesRead();
+        assertEquals(1, bytesRead);
+    }
+
+    @Test
+    public void testReadCountCorrectWhenSkippingWithReadInputStream() throws IOException {
+        final byte[] content = "This and that and the other.".getBytes(StandardCharsets.UTF_8);
+
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+            .id(1000L)
+            .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+            .entryDate(System.currentTimeMillis())
+            .contentClaim(contentRepo.create(content))
+            .size(content.length)
+            .build();
+
+        flowFileQueue.put(flowFileRecord);
+
+        FlowFile flowFile = session.get();
+        try (InputStream in = session.read(flowFile)) {
+            assertEquals('T', (char) in.read());
+            in.mark(10);
+            assertEquals(5, in.skip(5L));
+            assertEquals('n', (char) in.read());
+            in.reset();
+        };
+
+        session.transfer(flowFile);
+        session.commit();
+
+        final RepositoryStatusReport report = flowFileEventRepo.reportTransferEvents(0L);
+        final long bytesRead = report.getReportEntry("connectable-1").getBytesRead();
+        assertEquals(1, bytesRead);
+    }
+
+    @Test
     public void testHandlingOfMultipleFlowFilesWithSameId() {
         // Add two FlowFiles with the same ID
         for (int i=0; i < 2; i++) {