You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2014/07/17 20:20:54 UTC
svn commit: r1611427 - in
/hadoop/common/branches/branch-2/hadoop-common-project: ./ hadoop-common/
hadoop-common/src/ hadoop-common/src/main/java/
hadoop-common/src/main/java/org/apache/hadoop/io/compress/
Author: cmccabe
Date: Thu Jul 17 18:20:53 2014
New Revision: 1611427
URL: http://svn.apache.org/r1611427
Log:
HADOOP-10591. Compression codecs must used pooled direct buffers or deallocate direct buffers when stream is closed (cmccabe)
Modified:
hadoop/common/branches/branch-2/hadoop-common-project/ (props changed)
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/ (props changed)
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt (contents, props changed)
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/ (props changed)
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/ (props changed)
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java
Propchange: hadoop/common/branches/branch-2/hadoop-common-project/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project:r1611423
Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common:r1611423
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1611427&r1=1611426&r2=1611427&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt Thu Jul 17 18:20:53 2014
@@ -51,6 +51,9 @@ Release 2.6.0 - UNRELEASED
HADOOP-9921. daemon scripts should remove pid file on stop call after stop
or process is found not running ( vinayakumarb )
+ HADOOP-10591. Compression codecs must used pooled direct buffers or
+ deallocate direct buffers when stream is closed (cmccabe)
+
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:r1611423
Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src:r1611423
Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1611423
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java?rev=1611427&r1=1611426&r2=1611427&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java Thu Jul 17 18:20:53 2014
@@ -100,7 +100,8 @@ public class BZip2Codec implements Confi
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
- return createOutputStream(out, createCompressor());
+ return CompressionCodec.Util.
+ createOutputStreamWithCodecPool(this, conf, out);
}
/**
@@ -153,7 +154,8 @@ public class BZip2Codec implements Confi
@Override
public CompressionInputStream createInputStream(InputStream in)
throws IOException {
- return createInputStream(in, createDecompressor());
+ return CompressionCodec.Util.
+ createInputStreamWithCodecPool(this, conf, in);
}
/**
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java?rev=1611427&r1=1611426&r2=1611427&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java Thu Jul 17 18:20:53 2014
@@ -24,6 +24,7 @@ import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
/**
* This class encapsulates a streaming compression/decompression pair.
@@ -113,4 +114,58 @@ public interface CompressionCodec {
* @return the extension including the '.'
*/
String getDefaultExtension();
+
+ static class Util {
+ /**
+ * Create an output stream with a codec taken from the global CodecPool.
+ *
+ * @param codec The codec to use to create the output stream.
+ * @param conf The configuration to use if we need to create a new codec.
+ * @param out The output stream to wrap.
+ * @return The new output stream
+ * @throws IOException
+ */
+ static CompressionOutputStream createOutputStreamWithCodecPool(
+ CompressionCodec codec, Configuration conf, OutputStream out)
+ throws IOException {
+ Compressor compressor = CodecPool.getCompressor(codec, conf);
+ CompressionOutputStream stream = null;
+ try {
+ stream = codec.createOutputStream(out, compressor);
+ } finally {
+ if (stream == null) {
+ CodecPool.returnCompressor(compressor);
+ } else {
+ stream.setTrackedCompressor(compressor);
+ }
+ }
+ return stream;
+ }
+
+ /**
+ * Create an input stream with a codec taken from the global CodecPool.
+ *
+ * @param codec The codec to use to create the input stream.
+ * @param conf The configuration to use if we need to create a new codec.
+ * @param in The input stream to wrap.
+ * @return The new input stream
+ * @throws IOException
+ */
+ static CompressionInputStream createInputStreamWithCodecPool(
+ CompressionCodec codec, Configuration conf, InputStream in)
+ throws IOException {
+ Decompressor decompressor = CodecPool.getDecompressor(codec);
+ CompressionInputStream stream = null;
+ try {
+ stream = codec.createInputStream(in, decompressor);
+ } finally {
+ if (stream == null) {
+ CodecPool.returnDecompressor(decompressor);
+ } else {
+ stream.setTrackedDecompressor(decompressor);
+ }
+ }
+ return stream;
+ }
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java?rev=1611427&r1=1611426&r2=1611427&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java Thu Jul 17 18:20:53 2014
@@ -41,6 +41,8 @@ public abstract class CompressionInputSt
protected final InputStream in;
protected long maxAvailableData = 0L;
+ private Decompressor trackedDecompressor;
+
/**
* Create a compression input stream that reads
* the decompressed bytes from the given stream.
@@ -58,6 +60,10 @@ public abstract class CompressionInputSt
@Override
public void close() throws IOException {
in.close();
+ if (trackedDecompressor != null) {
+ CodecPool.returnDecompressor(trackedDecompressor);
+ trackedDecompressor = null;
+ }
}
/**
@@ -112,4 +118,8 @@ public abstract class CompressionInputSt
public boolean seekToNewSource(long targetPos) throws UnsupportedOperationException {
throw new UnsupportedOperationException();
}
+
+ void setTrackedDecompressor(Decompressor decompressor) {
+ trackedDecompressor = decompressor;
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java?rev=1611427&r1=1611426&r2=1611427&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java Thu Jul 17 18:20:53 2014
@@ -34,7 +34,13 @@ public abstract class CompressionOutputS
* The output stream to be compressed.
*/
protected final OutputStream out;
-
+
+ /**
+ * If non-null, this is the Compressor object that we should call
+ * CodecPool#returnCompressor on when this stream is closed.
+ */
+ private Compressor trackedCompressor;
+
/**
* Create a compression output stream that writes
* the compressed bytes to the given stream.
@@ -43,11 +49,19 @@ public abstract class CompressionOutputS
protected CompressionOutputStream(OutputStream out) {
this.out = out;
}
-
+
+ void setTrackedCompressor(Compressor compressor) {
+ trackedCompressor = compressor;
+ }
+
@Override
public void close() throws IOException {
finish();
out.close();
+ if (trackedCompressor != null) {
+ CodecPool.returnCompressor(trackedCompressor);
+ trackedCompressor = null;
+ }
}
@Override
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java?rev=1611427&r1=1611426&r2=1611427&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java Thu Jul 17 18:20:53 2014
@@ -51,14 +51,8 @@ public class DefaultCodec implements Con
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
- // This may leak memory if called in a loop. The createCompressor() call
- // may cause allocation of an untracked direct-backed buffer if native
- // libs are being used (even if you close the stream). A Compressor
- // object should be reused between successive calls.
- LOG.warn("DefaultCodec.createOutputStream() may leak memory. "
- + "Create a compressor first.");
- return new CompressorStream(out, createCompressor(),
- conf.getInt("io.file.buffer.size", 4*1024));
+ return CompressionCodec.Util.
+ createOutputStreamWithCodecPool(this, conf, out);
}
@Override
@@ -82,8 +76,8 @@ public class DefaultCodec implements Con
@Override
public CompressionInputStream createInputStream(InputStream in)
throws IOException {
- return new DecompressorStream(in, createDecompressor(),
- conf.getInt("io.file.buffer.size", 4*1024));
+ return CompressionCodec.Util.
+ createInputStreamWithCodecPool(this, conf, in);
}
@Override
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java?rev=1611427&r1=1611426&r2=1611427&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java Thu Jul 17 18:20:53 2014
@@ -104,10 +104,11 @@ public class GzipCodec extends DefaultCo
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
- return (ZlibFactory.isNativeZlibLoaded(conf)) ?
- new CompressorStream(out, createCompressor(),
- conf.getInt("io.file.buffer.size", 4*1024)) :
- new GzipOutputStream(out);
+ if (!ZlibFactory.isNativeZlibLoaded(conf)) {
+ return new GzipOutputStream(out);
+ }
+ return CompressionCodec.Util.
+ createOutputStreamWithCodecPool(this, conf, out);
}
@Override
@@ -137,8 +138,9 @@ public class GzipCodec extends DefaultCo
@Override
public CompressionInputStream createInputStream(InputStream in)
- throws IOException {
- return createInputStream(in, null);
+ throws IOException {
+ return CompressionCodec.Util.
+ createInputStreamWithCodecPool(this, conf, in);
}
@Override
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java?rev=1611427&r1=1611426&r2=1611427&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java Thu Jul 17 18:20:53 2014
@@ -84,7 +84,8 @@ public class Lz4Codec implements Configu
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
- return createOutputStream(out, createCompressor());
+ return CompressionCodec.Util.
+ createOutputStreamWithCodecPool(this, conf, out);
}
/**
@@ -157,7 +158,8 @@ public class Lz4Codec implements Configu
@Override
public CompressionInputStream createInputStream(InputStream in)
throws IOException {
- return createInputStream(in, createDecompressor());
+ return CompressionCodec.Util.
+ createInputStreamWithCodecPool(this, conf, in);
}
/**
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java?rev=1611427&r1=1611426&r2=1611427&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java Thu Jul 17 18:20:53 2014
@@ -95,7 +95,8 @@ public class SnappyCodec implements Conf
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
- return createOutputStream(out, createCompressor());
+ return CompressionCodec.Util.
+ createOutputStreamWithCodecPool(this, conf, out);
}
/**
@@ -158,7 +159,8 @@ public class SnappyCodec implements Conf
@Override
public CompressionInputStream createInputStream(InputStream in)
throws IOException {
- return createInputStream(in, createDecompressor());
+ return CompressionCodec.Util.
+ createInputStreamWithCodecPool(this, conf, in);
}
/**