You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2019/11/05 18:06:45 UTC
[nifi] branch master updated: NIFI-6841: Fixed bug that resulted in
the wrong number of 'Bytes Read' being reported by ByteCountingInputStream
in the event that #skip was called between calls to #mark and #reset
This is an automated email from the ASF dual-hosted git repository.
mcgilman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new a9db5a8 NIFI-6841: Fixed bug that resulted in the wrong number of 'Bytes Read' being reported by ByteCountingInputStream in the event that #skip was called between calls to #mark and #reset
a9db5a8 is described below
commit a9db5a8cb7313005b4077b66ce10ef81d3055ee8
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Nov 5 09:38:01 2019 -0500
NIFI-6841: Fixed bug that resulted in the wrong number of 'Bytes Read' being reported by ByteCountingInputStream in the event that #skip was called between calls to #mark and #reset
This closes #3868
---
.../nifi/stream/io/ByteCountingInputStream.java | 19 ++++---
.../repository/TestStandardProcessSession.java | 66 +++++++++++++++++++++-
2 files changed, 77 insertions(+), 8 deletions(-)
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
index 4c2372c..1d4ac0f 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
@@ -25,7 +25,8 @@ public class ByteCountingInputStream extends InputStream {
private long bytesRead = 0L;
private long bytesSkipped = 0L;
- private long bytesSinceMark = 0L;
+ private long bytesReadSinceMark = 0L;
+ private long bytesSkippedSinceMark = 0L;
public ByteCountingInputStream(final InputStream in) {
this.in = in;
@@ -41,7 +42,7 @@ public class ByteCountingInputStream extends InputStream {
final int fromSuper = in.read();
if (fromSuper >= 0) {
bytesRead++;
- bytesSinceMark++;
+ bytesReadSinceMark++;
}
return fromSuper;
}
@@ -51,7 +52,7 @@ public class ByteCountingInputStream extends InputStream {
final int fromSuper = in.read(b, off, len);
if (fromSuper >= 0) {
bytesRead += fromSuper;
- bytesSinceMark += fromSuper;
+ bytesReadSinceMark += fromSuper;
}
return fromSuper;
@@ -67,7 +68,7 @@ public class ByteCountingInputStream extends InputStream {
final long skipped = in.skip(n);
if (skipped >= 0) {
bytesSkipped += skipped;
- bytesSinceMark += skipped;
+ bytesSkippedSinceMark += skipped;
}
return skipped;
}
@@ -88,7 +89,8 @@ public class ByteCountingInputStream extends InputStream {
public void mark(final int readlimit) {
in.mark(readlimit);
- bytesSinceMark = 0L;
+ bytesReadSinceMark = 0L;
+ bytesSkippedSinceMark = 0L;
}
@Override
@@ -99,8 +101,11 @@ public class ByteCountingInputStream extends InputStream {
@Override
public void reset() throws IOException {
in.reset();
- bytesRead -= bytesSinceMark;
- bytesSinceMark = 0L;
+ bytesRead -= bytesReadSinceMark;
+ bytesSkipped -= bytesSkippedSinceMark;
+
+ bytesReadSinceMark = 0L;
+ bytesSkippedSinceMark = 0L;
}
@Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 267f22d..e347c15 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -29,6 +29,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup;
@@ -114,6 +115,7 @@ public class TestStandardProcessSession {
private ProvenanceEventRepository provenanceRepo;
private MockFlowFileRepository flowFileRepo;
private CounterRepository counterRepository;
+ private FlowFileEventRepository flowFileEventRepo;
private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build();
private static StandardResourceClaimManager resourceClaimManager;
@@ -155,7 +157,7 @@ public class TestStandardProcessSession {
resourceClaimManager = new StandardResourceClaimManager();
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestStandardProcessSession.class.getResource("/conf/nifi.properties").getFile());
- final FlowFileEventRepository flowFileEventRepo = Mockito.mock(FlowFileEventRepository.class);
+ flowFileEventRepo = new RingBufferEventRepository(1);
counterRepository = new StandardCounterRepository();
provenanceRepo = new MockProvenanceRepository();
@@ -341,6 +343,68 @@ public class TestStandardProcessSession {
}
@Test
+ public void testReadCountCorrectWhenSkippingWithReadCallback() throws IOException {
+ final byte[] content = "This and that and the other.".getBytes(StandardCharsets.UTF_8);
+
+ final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+ .id(1000L)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .contentClaim(contentRepo.create(content))
+ .size(content.length)
+ .build();
+
+ flowFileQueue.put(flowFileRecord);
+
+ FlowFile flowFile = session.get();
+ session.read(flowFile, in -> {
+ assertEquals('T', (char) in.read());
+ in.mark(10);
+ assertEquals(5, in.skip(5L));
+ assertEquals('n', (char) in.read());
+ in.reset();
+ });
+
+ session.transfer(flowFile);
+ session.commit();
+
+ final RepositoryStatusReport report = flowFileEventRepo.reportTransferEvents(0L);
+ final long bytesRead = report.getReportEntry("connectable-1").getBytesRead();
+ assertEquals(1, bytesRead);
+ }
+
+ @Test
+ public void testReadCountCorrectWhenSkippingWithReadInputStream() throws IOException {
+ final byte[] content = "This and that and the other.".getBytes(StandardCharsets.UTF_8);
+
+ final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+ .id(1000L)
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .contentClaim(contentRepo.create(content))
+ .size(content.length)
+ .build();
+
+ flowFileQueue.put(flowFileRecord);
+
+ FlowFile flowFile = session.get();
+ try (InputStream in = session.read(flowFile)) {
+ assertEquals('T', (char) in.read());
+ in.mark(10);
+ assertEquals(5, in.skip(5L));
+ assertEquals('n', (char) in.read());
+ in.reset();
+ };
+
+ session.transfer(flowFile);
+ session.commit();
+
+ final RepositoryStatusReport report = flowFileEventRepo.reportTransferEvents(0L);
+ final long bytesRead = report.getReportEntry("connectable-1").getBytesRead();
+ assertEquals(1, bytesRead);
+ }
+
+ @Test
public void testHandlingOfMultipleFlowFilesWithSameId() {
// Add two FlowFiles with the same ID
for (int i=0; i < 2; i++) {