You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2023/06/02 19:33:55 UTC
[nifi] branch support/nifi-1.x updated: NIFI-11636: Do not buffer Parquet content into memory unnecessarily
This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 5da77e8e74 NIFI-11636: Do not buffer Parquet content into memory unnecessarily
5da77e8e74 is described below
commit 5da77e8e74343f04ebe7da34ba47cbc3cbe0d4fc
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Jun 2 13:21:15 2023 -0400
NIFI-11636: Do not buffer Parquet content into memory unnecessarily
NIFI-11636: Change default log level from parquet internal reader to WARN as it logs excessively at INFO level
Signed-off-by: Matt Burgess <ma...@apache.org>
---
.../nifi-resources/src/main/resources/conf/logback.xml | 2 +-
.../java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java | 5 ++---
.../org/apache/nifi/parquet/stream/NifiSeekableInputStream.java | 8 ++++----
3 files changed, 7 insertions(+), 8 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
index 4a1e57e1f5..ae2733f880 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
@@ -119,7 +119,7 @@
<logger name="org.apache.nifi.processors.standard.LogAttribute" level="INFO"/>
<logger name="org.apache.nifi.processors.standard.LogMessage" level="INFO"/>
<logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="WARN" />
-
+ <logger name="org.apache.parquet.hadoop.InternalParquetRecordReader" level="WARN" />
<logger name="org.apache.zookeeper.ClientCnxn" level="ERROR" />
<logger name="org.apache.zookeeper.server.NIOServerCnxn" level="ERROR" />
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java
index c4ac722c7b..98e8cf3f75 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java
@@ -20,7 +20,6 @@ import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.SeekableInputStream;
-import java.io.IOException;
import java.io.InputStream;
public class NifiParquetInputFile implements InputFile {
@@ -42,12 +41,12 @@ public class NifiParquetInputFile implements InputFile {
}
@Override
- public long getLength() throws IOException {
+ public long getLength() {
return length;
}
@Override
- public SeekableInputStream newStream() throws IOException {
+ public SeekableInputStream newStream() {
return new NifiSeekableInputStream(input);
}
}
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java
index cd6b820536..89d2cc0c32 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java
@@ -29,11 +29,11 @@ public class NifiSeekableInputStream extends DelegatingSeekableInputStream {
public NifiSeekableInputStream(final ByteCountingInputStream input) {
super(input);
this.input = input;
- this.input.mark(Integer.MAX_VALUE);
+ this.input.mark(8192);
}
@Override
- public long getPos() throws IOException {
+ public long getPos() {
return input.getBytesConsumed();
}
@@ -47,7 +47,7 @@ public class NifiSeekableInputStream extends DelegatingSeekableInputStream {
if (newPos < currentPos) {
// seeking backwards so first reset back to beginning of the stream then seek
input.reset();
- input.mark(Integer.MAX_VALUE);
+ input.mark(8192);
}
// must call getPos() again in case reset was called above
@@ -65,7 +65,7 @@ public class NifiSeekableInputStream extends DelegatingSeekableInputStream {
}
@Override
- public synchronized void reset() throws IOException {
+ public synchronized void reset() {
throw new UnsupportedOperationException("Mark/reset is not supported");
}
}