You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2014/07/17 20:20:54 UTC

svn commit: r1611427 - in /hadoop/common/branches/branch-2/hadoop-common-project: ./ hadoop-common/ hadoop-common/src/ hadoop-common/src/main/java/ hadoop-common/src/main/java/org/apache/hadoop/io/compress/

Author: cmccabe
Date: Thu Jul 17 18:20:53 2014
New Revision: 1611427

URL: http://svn.apache.org/r1611427
Log:
HADOOP-10591. Compression codecs must used pooled direct buffers or deallocate direct buffers when stream is closed (cmccabe)

Modified:
    hadoop/common/branches/branch-2/hadoop-common-project/   (props changed)
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/   (props changed)
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/   (props changed)
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/   (props changed)
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java

Propchange: hadoop/common/branches/branch-2/hadoop-common-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project:r1611423

Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common:r1611423

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1611427&r1=1611426&r2=1611427&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt Thu Jul 17 18:20:53 2014
@@ -51,6 +51,9 @@ Release 2.6.0 - UNRELEASED
     HADOOP-9921. daemon scripts should remove pid file on stop call after stop
     or process is found not running ( vinayakumarb )
 
+    HADOOP-10591.  Compression codecs must used pooled direct buffers or
+    deallocate direct buffers when stream is closed (cmccabe)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt:r1611423

Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src:r1611423

Propchange: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java:r1611423

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java?rev=1611427&r1=1611426&r2=1611427&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java Thu Jul 17 18:20:53 2014
@@ -100,7 +100,8 @@ public class BZip2Codec implements Confi
   @Override
   public CompressionOutputStream createOutputStream(OutputStream out)
       throws IOException {
-    return createOutputStream(out, createCompressor());
+    return CompressionCodec.Util.
+        createOutputStreamWithCodecPool(this, conf, out);
   }
 
   /**
@@ -153,7 +154,8 @@ public class BZip2Codec implements Confi
   @Override
   public CompressionInputStream createInputStream(InputStream in)
       throws IOException {
-    return createInputStream(in, createDecompressor());
+    return CompressionCodec.Util.
+        createInputStreamWithCodecPool(this, conf, in);
   }
 
   /**

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java?rev=1611427&r1=1611426&r2=1611427&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java Thu Jul 17 18:20:53 2014
@@ -24,6 +24,7 @@ import java.io.OutputStream;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * This class encapsulates a streaming compression/decompression pair.
@@ -113,4 +114,58 @@ public interface CompressionCodec {
    * @return the extension including the '.'
    */
   String getDefaultExtension();
+
+  static class Util {
+    /**
+     * Create an output stream with a codec taken from the global CodecPool.
+     *
+     * @param codec       The codec to use to create the output stream.
+     * @param conf        The configuration to use if we need to create a new codec.
+     * @param out         The output stream to wrap.
+     * @return            The new output stream
+     * @throws IOException
+     */
+    static CompressionOutputStream createOutputStreamWithCodecPool(
+        CompressionCodec codec, Configuration conf, OutputStream out)
+        throws IOException {
+      Compressor compressor = CodecPool.getCompressor(codec, conf);
+      CompressionOutputStream stream = null;
+      try {
+        stream = codec.createOutputStream(out, compressor);
+      } finally {
+        if (stream == null) {
+          CodecPool.returnCompressor(compressor);
+        } else {
+          stream.setTrackedCompressor(compressor);
+        }
+      }
+      return stream;
+    }
+
+    /**
+     * Create an input stream with a codec taken from the global CodecPool.
+     *
+     * @param codec       The codec to use to create the input stream.
+     * @param conf        The configuration to use if we need to create a new codec.
+     * @param in          The input stream to wrap.
+     * @return            The new input stream
+     * @throws IOException
+     */
+    static CompressionInputStream createInputStreamWithCodecPool(
+        CompressionCodec codec,  Configuration conf, InputStream in)
+          throws IOException {
+      Decompressor decompressor = CodecPool.getDecompressor(codec);
+      CompressionInputStream stream = null;
+      try {
+        stream = codec.createInputStream(in, decompressor);
+      } finally {
+        if (stream == null) {
+          CodecPool.returnDecompressor(decompressor);
+        } else {
+          stream.setTrackedDecompressor(decompressor);
+        }
+      }
+      return stream;
+    }
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java?rev=1611427&r1=1611426&r2=1611427&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java Thu Jul 17 18:20:53 2014
@@ -41,6 +41,8 @@ public abstract class CompressionInputSt
   protected final InputStream in;
   protected long maxAvailableData = 0L;
 
+  private Decompressor trackedDecompressor;
+
   /**
    * Create a compression input stream that reads
    * the decompressed bytes from the given stream.
@@ -58,6 +60,10 @@ public abstract class CompressionInputSt
   @Override
   public void close() throws IOException {
     in.close();
+    if (trackedDecompressor != null) {
+      CodecPool.returnDecompressor(trackedDecompressor);
+      trackedDecompressor = null;
+    }
   }
   
   /**
@@ -112,4 +118,8 @@ public abstract class CompressionInputSt
   public boolean seekToNewSource(long targetPos) throws UnsupportedOperationException {
     throw new UnsupportedOperationException();
   }
+
+  void setTrackedDecompressor(Decompressor decompressor) {
+    trackedDecompressor = decompressor;
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java?rev=1611427&r1=1611426&r2=1611427&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java Thu Jul 17 18:20:53 2014
@@ -34,7 +34,13 @@ public abstract class CompressionOutputS
    * The output stream to be compressed. 
    */
   protected final OutputStream out;
-  
+
+  /**
+   * If non-null, this is the Compressor object that we should call
+   * CodecPool#returnCompressor on when this stream is closed.
+   */
+  private Compressor trackedCompressor;
+
   /**
    * Create a compression output stream that writes
    * the compressed bytes to the given stream.
@@ -43,11 +49,19 @@ public abstract class CompressionOutputS
   protected CompressionOutputStream(OutputStream out) {
     this.out = out;
   }
-  
+
+  void setTrackedCompressor(Compressor compressor) {
+    trackedCompressor = compressor;
+  }
+
   @Override
   public void close() throws IOException {
     finish();
     out.close();
+    if (trackedCompressor != null) {
+      CodecPool.returnCompressor(trackedCompressor);
+      trackedCompressor = null;
+    }
   }
   
   @Override

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java?rev=1611427&r1=1611426&r2=1611427&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java Thu Jul 17 18:20:53 2014
@@ -51,14 +51,8 @@ public class DefaultCodec implements Con
   @Override
   public CompressionOutputStream createOutputStream(OutputStream out) 
   throws IOException {
-    // This may leak memory if called in a loop. The createCompressor() call
-    // may cause allocation of an untracked direct-backed buffer if native
-    // libs are being used (even if you close the stream).  A Compressor
-    // object should be reused between successive calls.
-    LOG.warn("DefaultCodec.createOutputStream() may leak memory. "
-        + "Create a compressor first.");
-    return new CompressorStream(out, createCompressor(), 
-                                conf.getInt("io.file.buffer.size", 4*1024));
+    return CompressionCodec.Util.
+        createOutputStreamWithCodecPool(this, conf, out);
   }
 
   @Override
@@ -82,8 +76,8 @@ public class DefaultCodec implements Con
   @Override
   public CompressionInputStream createInputStream(InputStream in) 
   throws IOException {
-    return new DecompressorStream(in, createDecompressor(),
-                                  conf.getInt("io.file.buffer.size", 4*1024));
+    return CompressionCodec.Util.
+        createInputStreamWithCodecPool(this, conf, in);
   }
 
   @Override

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java?rev=1611427&r1=1611426&r2=1611427&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java Thu Jul 17 18:20:53 2014
@@ -104,10 +104,11 @@ public class GzipCodec extends DefaultCo
   @Override
   public CompressionOutputStream createOutputStream(OutputStream out) 
     throws IOException {
-    return (ZlibFactory.isNativeZlibLoaded(conf)) ?
-               new CompressorStream(out, createCompressor(),
-                                    conf.getInt("io.file.buffer.size", 4*1024)) :
-               new GzipOutputStream(out);
+    if (!ZlibFactory.isNativeZlibLoaded(conf)) {
+      return new GzipOutputStream(out);
+    }
+    return CompressionCodec.Util.
+        createOutputStreamWithCodecPool(this, conf, out);
   }
   
   @Override
@@ -137,8 +138,9 @@ public class GzipCodec extends DefaultCo
 
   @Override
   public CompressionInputStream createInputStream(InputStream in)
-  throws IOException {
-    return createInputStream(in, null);
+      throws IOException {
+    return CompressionCodec.Util.
+        createInputStreamWithCodecPool(this, conf, in);
   }
 
   @Override

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java?rev=1611427&r1=1611426&r2=1611427&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java Thu Jul 17 18:20:53 2014
@@ -84,7 +84,8 @@ public class Lz4Codec implements Configu
   @Override
   public CompressionOutputStream createOutputStream(OutputStream out)
       throws IOException {
-    return createOutputStream(out, createCompressor());
+    return CompressionCodec.Util.
+        createOutputStreamWithCodecPool(this, conf, out);
   }
 
   /**
@@ -157,7 +158,8 @@ public class Lz4Codec implements Configu
   @Override
   public CompressionInputStream createInputStream(InputStream in)
       throws IOException {
-    return createInputStream(in, createDecompressor());
+    return CompressionCodec.Util.
+        createInputStreamWithCodecPool(this, conf, in);
   }
 
   /**

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java?rev=1611427&r1=1611426&r2=1611427&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java Thu Jul 17 18:20:53 2014
@@ -95,7 +95,8 @@ public class SnappyCodec implements Conf
   @Override
   public CompressionOutputStream createOutputStream(OutputStream out)
       throws IOException {
-    return createOutputStream(out, createCompressor());
+    return CompressionCodec.Util.
+        createOutputStreamWithCodecPool(this, conf, out);
   }
 
   /**
@@ -158,7 +159,8 @@ public class SnappyCodec implements Conf
   @Override
   public CompressionInputStream createInputStream(InputStream in)
       throws IOException {
-    return createInputStream(in, createDecompressor());
+    return CompressionCodec.Util.
+        createInputStreamWithCodecPool(this, conf, in);
   }
 
   /**