You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/05/16 16:10:20 UTC

[03/50] [abbrv] hadoop git commit: HADOOP-14376. Memory leak when reading a compressed file using the native library. Contributed by Eli Acherkan

HADOOP-14376. Memory leak when reading a compressed file using the native library. Contributed by Eli Acherkan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7bc21722
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7bc21722
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7bc21722

Branch: refs/heads/YARN-2915
Commit: 7bc217224891b7f7f0a2e35e37e46b36d8c5309d
Parents: 6c35001
Author: Jason Lowe <jl...@yahoo-inc.com>
Authored: Fri May 12 16:54:08 2017 -0500
Committer: Jason Lowe <jl...@yahoo-inc.com>
Committed: Fri May 12 16:54:08 2017 -0500

----------------------------------------------------------------------
 .../apache/hadoop/io/compress/BZip2Codec.java   |  20 ++--
 .../apache/hadoop/io/compress/CodecPool.java    |  10 +-
 .../io/compress/CompressionInputStream.java     |  11 +-
 .../io/compress/CompressionOutputStream.java    |  16 ++-
 .../hadoop/io/compress/CompressorStream.java    |   3 +-
 .../hadoop/io/compress/DecompressorStream.java  |   7 +-
 .../apache/hadoop/io/compress/TestCodec.java    | 103 +++++++++++--------
 7 files changed, 102 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7bc21722/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
index 08b4d4d..331606e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
@@ -336,15 +336,11 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
     }
 
     public void close() throws IOException {
-      if (needsReset) {
-        // In the case that nothing is written to this stream, we still need to
-        // write out the header before closing, otherwise the stream won't be
-        // recognized by BZip2CompressionInputStream.
-        internalReset();
+      try {
+        super.close();
+      } finally {
+        output.close();
       }
-      this.output.flush();
-      this.output.close();
-      needsReset = true;
     }
 
   }// end of class BZip2CompressionOutputStream
@@ -454,8 +450,12 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
 
     public void close() throws IOException {
       if (!needsReset) {
-        input.close();
-        needsReset = true;
+        try {
+          input.close();
+          needsReset = true;
+        } finally {
+          super.close();
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7bc21722/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
index bb566de..01bffa7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
@@ -157,7 +157,10 @@ public class CodecPool {
         LOG.debug("Got recycled compressor");
       }
     }
-    updateLeaseCount(compressorCounts, compressor, 1);
+    if (compressor != null &&
+        !compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
+      updateLeaseCount(compressorCounts, compressor, 1);
+    }
     return compressor;
   }
   
@@ -184,7 +187,10 @@ public class CodecPool {
         LOG.debug("Got recycled decompressor");
       }
     }
-    updateLeaseCount(decompressorCounts, decompressor, 1);
+    if (decompressor != null &&
+        !decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
+      updateLeaseCount(decompressorCounts, decompressor, 1);
+    }
     return decompressor;
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7bc21722/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java
index cf3ac40..2dfa30b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java
@@ -59,10 +59,13 @@ public abstract class CompressionInputStream extends InputStream implements Seek
 
   @Override
   public void close() throws IOException {
-    in.close();
-    if (trackedDecompressor != null) {
-      CodecPool.returnDecompressor(trackedDecompressor);
-      trackedDecompressor = null;
+    try {
+      in.close();
+    } finally {
+      if (trackedDecompressor != null) {
+        CodecPool.returnDecompressor(trackedDecompressor);
+        trackedDecompressor = null;
+      }
     }
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7bc21722/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java
index 00e272a..71c7f32 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java
@@ -56,11 +56,17 @@ public abstract class CompressionOutputStream extends OutputStream {
 
   @Override
   public void close() throws IOException {
-    finish();
-    out.close();
-    if (trackedCompressor != null) {
-      CodecPool.returnCompressor(trackedCompressor);
-      trackedCompressor = null;
+    try {
+      finish();
+    } finally {
+      try {
+        out.close();
+      } finally {
+        if (trackedCompressor != null) {
+          CodecPool.returnCompressor(trackedCompressor);
+          trackedCompressor = null;
+        }
+      }
     }
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7bc21722/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressorStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressorStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressorStream.java
index 34426f8..be5eee0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressorStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressorStream.java
@@ -103,10 +103,9 @@ public class CompressorStream extends CompressionOutputStream {
   public void close() throws IOException {
     if (!closed) {
       try {
-        finish();
+        super.close();
       }
       finally {
-        out.close();
         closed = true;
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7bc21722/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java
index dab366a..756ccf3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java
@@ -221,8 +221,11 @@ public class DecompressorStream extends CompressionInputStream {
   @Override
   public void close() throws IOException {
     if (!closed) {
-      in.close();
-      closed = true;
+      try {
+        super.close();
+      } finally {
+        closed = true;
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7bc21722/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
index 3955aa2..1ea9dc8 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
@@ -205,66 +205,83 @@ public class TestCodec {
     
     // Compress data
     DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
-    CompressionOutputStream deflateFilter = 
+    int leasedCompressorsBefore = codec.getCompressorType() == null ? -1
+        : CodecPool.getLeasedCompressorsCount(codec);
+    try (CompressionOutputStream deflateFilter =
       codec.createOutputStream(compressedDataBuffer);
-    DataOutputStream deflateOut = 
-      new DataOutputStream(new BufferedOutputStream(deflateFilter));
-    deflateOut.write(data.getData(), 0, data.getLength());
-    deflateOut.flush();
-    deflateFilter.finish();
+      DataOutputStream deflateOut =
+        new DataOutputStream(new BufferedOutputStream(deflateFilter))) {
+      deflateOut.write(data.getData(), 0, data.getLength());
+      deflateOut.flush();
+      deflateFilter.finish();
+    }
+    if (leasedCompressorsBefore > -1) {
+      assertEquals("leased compressor not returned to the codec pool",
+          leasedCompressorsBefore, CodecPool.getLeasedCompressorsCount(codec));
+    }
     LOG.info("Finished compressing data");
     
     // De-compress data
     DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
     deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0, 
                                  compressedDataBuffer.getLength());
-    CompressionInputStream inflateFilter = 
-      codec.createInputStream(deCompressedDataBuffer);
-    DataInputStream inflateIn = 
-      new DataInputStream(new BufferedInputStream(inflateFilter));
-
-    // Check
     DataInputBuffer originalData = new DataInputBuffer();
-    originalData.reset(data.getData(), 0, data.getLength());
-    DataInputStream originalIn = new DataInputStream(new BufferedInputStream(originalData));
-    for(int i=0; i < count; ++i) {
-      RandomDatum k1 = new RandomDatum();
-      RandomDatum v1 = new RandomDatum();
-      k1.readFields(originalIn);
-      v1.readFields(originalIn);
+    int leasedDecompressorsBefore =
+        CodecPool.getLeasedDecompressorsCount(codec);
+    try (CompressionInputStream inflateFilter =
+      codec.createInputStream(deCompressedDataBuffer);
+      DataInputStream inflateIn =
+        new DataInputStream(new BufferedInputStream(inflateFilter))) {
+
+      // Check
+      originalData.reset(data.getData(), 0, data.getLength());
+      DataInputStream originalIn =
+          new DataInputStream(new BufferedInputStream(originalData));
+      for(int i=0; i < count; ++i) {
+        RandomDatum k1 = new RandomDatum();
+        RandomDatum v1 = new RandomDatum();
+        k1.readFields(originalIn);
+        v1.readFields(originalIn);
       
-      RandomDatum k2 = new RandomDatum();
-      RandomDatum v2 = new RandomDatum();
-      k2.readFields(inflateIn);
-      v2.readFields(inflateIn);
-      assertTrue("original and compressed-then-decompressed-output not equal",
-                 k1.equals(k2) && v1.equals(v2));
+        RandomDatum k2 = new RandomDatum();
+        RandomDatum v2 = new RandomDatum();
+        k2.readFields(inflateIn);
+        v2.readFields(inflateIn);
+        assertTrue("original and compressed-then-decompressed-output not equal",
+                   k1.equals(k2) && v1.equals(v2));
       
-      // original and compressed-then-decompressed-output have the same hashCode
-      Map<RandomDatum, String> m = new HashMap<RandomDatum, String>();
-      m.put(k1, k1.toString());
-      m.put(v1, v1.toString());
-      String result = m.get(k2);
-      assertEquals("k1 and k2 hashcode not equal", result, k1.toString());
-      result = m.get(v2);
-      assertEquals("v1 and v2 hashcode not equal", result, v1.toString());
+        // original and compressed-then-decompressed-output have the same
+        // hashCode
+        Map<RandomDatum, String> m = new HashMap<RandomDatum, String>();
+        m.put(k1, k1.toString());
+        m.put(v1, v1.toString());
+        String result = m.get(k2);
+        assertEquals("k1 and k2 hashcode not equal", result, k1.toString());
+        result = m.get(v2);
+        assertEquals("v1 and v2 hashcode not equal", result, v1.toString());
+      }
     }
+    assertEquals("leased decompressor not returned to the codec pool",
+        leasedDecompressorsBefore,
+        CodecPool.getLeasedDecompressorsCount(codec));
 
     // De-compress data byte-at-a-time
     originalData.reset(data.getData(), 0, data.getLength());
     deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0, 
                                  compressedDataBuffer.getLength());
-    inflateFilter = 
+    try (CompressionInputStream inflateFilter =
       codec.createInputStream(deCompressedDataBuffer);
-
-    // Check
-    originalIn = new DataInputStream(new BufferedInputStream(originalData));
-    int expected;
-    do {
-      expected = originalIn.read();
-      assertEquals("Inflated stream read by byte does not match",
-        expected, inflateFilter.read());
-    } while (expected != -1);
+      DataInputStream originalIn =
+        new DataInputStream(new BufferedInputStream(originalData))) {
+
+      // Check
+      int expected;
+      do {
+        expected = originalIn.read();
+        assertEquals("Inflated stream read by byte does not match",
+            expected, inflateFilter.read());
+      } while (expected != -1);
+    }
 
     LOG.info("SUCCESS! Completed checking " + count + " records");
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org