You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2023/06/02 19:33:55 UTC

[nifi] branch support/nifi-1.x updated: NIFI-11636: Do not buffer Parquet content into memory unnecessarily

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 5da77e8e74 NIFI-11636: Do not buffer Parquet content into memory unnecessarily
5da77e8e74 is described below

commit 5da77e8e74343f04ebe7da34ba47cbc3cbe0d4fc
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Jun 2 13:21:15 2023 -0400

    NIFI-11636: Do not buffer Parquet content into memory unnecessarily
    
    NIFI-11636: Change default log level from parquet internal reader to WARN as it logs excessively at INFO level
    Signed-off-by: Matt Burgess <ma...@apache.org>
---
 .../nifi-resources/src/main/resources/conf/logback.xml            | 2 +-
 .../java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java | 5 ++---
 .../org/apache/nifi/parquet/stream/NifiSeekableInputStream.java   | 8 ++++----
 3 files changed, 7 insertions(+), 8 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
index 4a1e57e1f5..ae2733f880 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
@@ -119,7 +119,7 @@
     <logger name="org.apache.nifi.processors.standard.LogAttribute" level="INFO"/>
     <logger name="org.apache.nifi.processors.standard.LogMessage" level="INFO"/>
     <logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="WARN" />
-
+    <logger name="org.apache.parquet.hadoop.InternalParquetRecordReader" level="WARN" />
 
     <logger name="org.apache.zookeeper.ClientCnxn" level="ERROR" />
     <logger name="org.apache.zookeeper.server.NIOServerCnxn" level="ERROR" />
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java
index c4ac722c7b..98e8cf3f75 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java
@@ -20,7 +20,6 @@ import org.apache.nifi.stream.io.ByteCountingInputStream;
 import org.apache.parquet.io.InputFile;
 import org.apache.parquet.io.SeekableInputStream;
 
-import java.io.IOException;
 import java.io.InputStream;
 
 public class NifiParquetInputFile implements InputFile {
@@ -42,12 +41,12 @@ public class NifiParquetInputFile implements InputFile {
     }
 
     @Override
-    public long getLength() throws IOException {
+    public long getLength() {
         return length;
     }
 
     @Override
-    public SeekableInputStream newStream() throws IOException {
+    public SeekableInputStream newStream() {
         return new NifiSeekableInputStream(input);
     }
 }
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java
index cd6b820536..89d2cc0c32 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java
@@ -29,11 +29,11 @@ public class NifiSeekableInputStream extends DelegatingSeekableInputStream {
     public NifiSeekableInputStream(final ByteCountingInputStream input) {
         super(input);
         this.input = input;
-        this.input.mark(Integer.MAX_VALUE);
+        this.input.mark(8192);
     }
 
     @Override
-    public long getPos() throws IOException {
+    public long getPos() {
         return input.getBytesConsumed();
     }
 
@@ -47,7 +47,7 @@ public class NifiSeekableInputStream extends DelegatingSeekableInputStream {
         if (newPos < currentPos) {
             // seeking backwards so first reset back to beginning of the stream then seek
             input.reset();
-            input.mark(Integer.MAX_VALUE);
+            input.mark(8192);
         }
 
         // must call getPos() again in case reset was called above
@@ -65,7 +65,7 @@ public class NifiSeekableInputStream extends DelegatingSeekableInputStream {
     }
 
     @Override
-    public synchronized void reset() throws IOException {
+    public synchronized void reset() {
         throw new UnsupportedOperationException("Mark/reset is not supported");
     }
 }