You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/07/08 03:25:00 UTC

[jira] [Commented] (PARQUET-1320) Fast clean unused direct memory when decompress

    [ https://issues.apache.org/jira/browse/PARQUET-1320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535978#comment-16535978 ] 

ASF GitHub Bot commented on PARQUET-1320:
-----------------------------------------

caneGuy closed pull request #492: PARQUET-1320: Fix potential direct memory leak for snappy decompressor
URL: https://github.com/apache/parquet-mr/pull/492
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressorStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressorStream.java
index 2740e860b..a1fd47d97 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressorStream.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressorStream.java
@@ -18,6 +18,7 @@
  */
 package org.apache.parquet.hadoop.codec;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -42,7 +43,7 @@ public int read(byte[] b, int off, int len) throws IOException {
 	  // Send all the compressed input to the decompressor.
 	  while (true) {
 		int compressedBytes = getCompressedData();
-		if (compressedBytes == -1) break;
+		if (compressedBytes == 0) break;
 		decompressor.setInput(buffer, 0, compressedBytes);
 	  }
 	  inputHandled = true;
@@ -54,4 +55,24 @@ public int read(byte[] b, int off, int len) throws IOException {
 	}
 	return decompressedBytes;
   }
+
+	@Override
+	protected int getCompressedData() throws IOException {
+		int len = this.in.available();
+
+		if (len > this.buffer.length) {
+			this.buffer = new byte[len];
+		}
+
+		int n = 0, off = 0;
+		while (n < len) {
+			int count = in.read(buffer, off + n, len - n);
+			if (count < 0) {
+				throw new EOFException("Unexpected end of block in input stream");
+			}
+			n += count;
+		}
+
+		return len;
+	}
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
index 190f8d531..238871d86 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
@@ -25,6 +25,7 @@
 import org.xerial.snappy.Snappy;
 
 import org.apache.parquet.Preconditions;
+import sun.nio.ch.DirectBuffer;
 
 public class SnappyDecompressor implements Decompressor {
   // Buffer for uncompressed output. This buffer grows as necessary.
@@ -34,6 +35,8 @@
   private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0);
 
   private boolean finished;
+
+  private int maxBufferSize = 64 * 1024 * 1024;
   
   /**
    * Fills specified buffer with uncompressed data. Returns actual number
@@ -61,7 +64,9 @@ public synchronized int decompress(byte[] buffer, int off, int len) throws IOExc
       // There is compressed input, decompress it now.
       int decompressedSize = Snappy.uncompressedLength(inputBuffer);
       if (decompressedSize > outputBuffer.capacity()) {
+        ByteBuffer oldBuffer = outputBuffer;
         outputBuffer = ByteBuffer.allocateDirect(decompressedSize);
+        ((DirectBuffer)oldBuffer).cleaner().clean();
       }
 
       // Reset the previous outputBuffer (i.e. set position to 0)
@@ -102,7 +107,9 @@ public synchronized void setInput(byte[] buffer, int off, int len) {
       ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len);
       inputBuffer.rewind();
       newBuffer.put(inputBuffer);
-      inputBuffer = newBuffer;      
+      ByteBuffer oldBuffer = inputBuffer;
+      inputBuffer = newBuffer;
+      ((DirectBuffer)(oldBuffer)).cleaner().clean();
     } else {
       inputBuffer.limit(inputBuffer.position() + len);
     }
@@ -131,6 +138,18 @@ public synchronized boolean needsInput() {
 
   @Override
   public synchronized void reset() {
+    if (inputBuffer.capacity() > maxBufferSize) {
+      ByteBuffer oldBuffer = inputBuffer;
+      inputBuffer = ByteBuffer.allocateDirect(maxBufferSize);
+      ((DirectBuffer)oldBuffer).cleaner().clean();
+    }
+
+    if (outputBuffer.capacity() > maxBufferSize) {
+      ByteBuffer oldBuffer = outputBuffer;
+      outputBuffer = ByteBuffer.allocateDirect(maxBufferSize);
+      ((DirectBuffer)oldBuffer).cleaner().clean();
+    }
+
     finished = false;
     inputBuffer.rewind();
     outputBuffer.rewind();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Fast clean unused direct memory when decompress
> -----------------------------------------------
>
>                 Key: PARQUET-1320
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1320
>             Project: Parquet
>          Issue Type: Improvement
>          Components: parquet-mr
>            Reporter: zhoukang
>            Priority: Major
>              Labels: pull-request-available
>
> When use *NonBlockedDecompressorStream* which call:
> *SnappyDecompressor.setInput*
> {code:java}
> public synchronized void setInput(byte[] buffer, int off, int len) {
>  SnappyUtil
> public synchronized void setInput(byte[] buffer, int off, int len) {
>  SnappyUtil.validateBuffer(buffer, off, len);
>  if (inputBuffer.capacity() - inputBuffer.position() < len) {
>  ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len);
>  inputBuffer.rewind();
>  newBuffer.put(inputBuffer);
>  inputBuffer = newBuffer; 
>  } else {
>  inputBuffer.limit(inputBuffer.position() + len);
>  }
>  inputBuffer.put(buffer, off, len);
> }
> .validateBuffer(buffer, off, len);
>  if (inputBuffer.capacity() - inputBuffer.position() < len) {
>  ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len);
>  inputBuffer.rewind();
>  newBuffer.put(inputBuffer);
>  inputBuffer = newBuffer; 
>  } else {
>  inputBuffer.limit(inputBuffer.position() + len);
>  }
>  inputBuffer.put(buffer, off, len);
> }
> {code}
> If we do not get any full gc for old gen.we may failed by off-heap memory leak
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)