You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2017/05/12 21:55:19 UTC
hadoop git commit: HADOOP-14376. Memory leak when reading a
compressed file using the native library. Contributed by Eli Acherkan
Repository: hadoop
Updated Branches:
refs/heads/trunk 6c35001b9 -> 7bc217224
HADOOP-14376. Memory leak when reading a compressed file using the native library. Contributed by Eli Acherkan
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7bc21722
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7bc21722
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7bc21722
Branch: refs/heads/trunk
Commit: 7bc217224891b7f7f0a2e35e37e46b36d8c5309d
Parents: 6c35001
Author: Jason Lowe <jl...@yahoo-inc.com>
Authored: Fri May 12 16:54:08 2017 -0500
Committer: Jason Lowe <jl...@yahoo-inc.com>
Committed: Fri May 12 16:54:08 2017 -0500
----------------------------------------------------------------------
.../apache/hadoop/io/compress/BZip2Codec.java | 20 ++--
.../apache/hadoop/io/compress/CodecPool.java | 10 +-
.../io/compress/CompressionInputStream.java | 11 +-
.../io/compress/CompressionOutputStream.java | 16 ++-
.../hadoop/io/compress/CompressorStream.java | 3 +-
.../hadoop/io/compress/DecompressorStream.java | 7 +-
.../apache/hadoop/io/compress/TestCodec.java | 103 +++++++++++--------
7 files changed, 102 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7bc21722/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
index 08b4d4d..331606e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
@@ -336,15 +336,11 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
}
public void close() throws IOException {
- if (needsReset) {
- // In the case that nothing is written to this stream, we still need to
- // write out the header before closing, otherwise the stream won't be
- // recognized by BZip2CompressionInputStream.
- internalReset();
+ try {
+ super.close();
+ } finally {
+ output.close();
}
- this.output.flush();
- this.output.close();
- needsReset = true;
}
}// end of class BZip2CompressionOutputStream
@@ -454,8 +450,12 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
public void close() throws IOException {
if (!needsReset) {
- input.close();
- needsReset = true;
+ try {
+ input.close();
+ needsReset = true;
+ } finally {
+ super.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7bc21722/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
index bb566de..01bffa7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
@@ -157,7 +157,10 @@ public class CodecPool {
LOG.debug("Got recycled compressor");
}
}
- updateLeaseCount(compressorCounts, compressor, 1);
+ if (compressor != null &&
+ !compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
+ updateLeaseCount(compressorCounts, compressor, 1);
+ }
return compressor;
}
@@ -184,7 +187,10 @@ public class CodecPool {
LOG.debug("Got recycled decompressor");
}
}
- updateLeaseCount(decompressorCounts, decompressor, 1);
+ if (decompressor != null &&
+ !decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
+ updateLeaseCount(decompressorCounts, decompressor, 1);
+ }
return decompressor;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7bc21722/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java
index cf3ac40..2dfa30b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java
@@ -59,10 +59,13 @@ public abstract class CompressionInputStream extends InputStream implements Seek
@Override
public void close() throws IOException {
- in.close();
- if (trackedDecompressor != null) {
- CodecPool.returnDecompressor(trackedDecompressor);
- trackedDecompressor = null;
+ try {
+ in.close();
+ } finally {
+ if (trackedDecompressor != null) {
+ CodecPool.returnDecompressor(trackedDecompressor);
+ trackedDecompressor = null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7bc21722/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java
index 00e272a..71c7f32 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java
@@ -56,11 +56,17 @@ public abstract class CompressionOutputStream extends OutputStream {
@Override
public void close() throws IOException {
- finish();
- out.close();
- if (trackedCompressor != null) {
- CodecPool.returnCompressor(trackedCompressor);
- trackedCompressor = null;
+ try {
+ finish();
+ } finally {
+ try {
+ out.close();
+ } finally {
+ if (trackedCompressor != null) {
+ CodecPool.returnCompressor(trackedCompressor);
+ trackedCompressor = null;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7bc21722/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressorStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressorStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressorStream.java
index 34426f8..be5eee0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressorStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressorStream.java
@@ -103,10 +103,9 @@ public class CompressorStream extends CompressionOutputStream {
public void close() throws IOException {
if (!closed) {
try {
- finish();
+ super.close();
}
finally {
- out.close();
closed = true;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7bc21722/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java
index dab366a..756ccf3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java
@@ -221,8 +221,11 @@ public class DecompressorStream extends CompressionInputStream {
@Override
public void close() throws IOException {
if (!closed) {
- in.close();
- closed = true;
+ try {
+ super.close();
+ } finally {
+ closed = true;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7bc21722/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
index 3955aa2..1ea9dc8 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
@@ -205,66 +205,83 @@ public class TestCodec {
// Compress data
DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
- CompressionOutputStream deflateFilter =
+ int leasedCompressorsBefore = codec.getCompressorType() == null ? -1
+ : CodecPool.getLeasedCompressorsCount(codec);
+ try (CompressionOutputStream deflateFilter =
codec.createOutputStream(compressedDataBuffer);
- DataOutputStream deflateOut =
- new DataOutputStream(new BufferedOutputStream(deflateFilter));
- deflateOut.write(data.getData(), 0, data.getLength());
- deflateOut.flush();
- deflateFilter.finish();
+ DataOutputStream deflateOut =
+ new DataOutputStream(new BufferedOutputStream(deflateFilter))) {
+ deflateOut.write(data.getData(), 0, data.getLength());
+ deflateOut.flush();
+ deflateFilter.finish();
+ }
+ if (leasedCompressorsBefore > -1) {
+ assertEquals("leased compressor not returned to the codec pool",
+ leasedCompressorsBefore, CodecPool.getLeasedCompressorsCount(codec));
+ }
LOG.info("Finished compressing data");
// De-compress data
DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
compressedDataBuffer.getLength());
- CompressionInputStream inflateFilter =
- codec.createInputStream(deCompressedDataBuffer);
- DataInputStream inflateIn =
- new DataInputStream(new BufferedInputStream(inflateFilter));
-
- // Check
DataInputBuffer originalData = new DataInputBuffer();
- originalData.reset(data.getData(), 0, data.getLength());
- DataInputStream originalIn = new DataInputStream(new BufferedInputStream(originalData));
- for(int i=0; i < count; ++i) {
- RandomDatum k1 = new RandomDatum();
- RandomDatum v1 = new RandomDatum();
- k1.readFields(originalIn);
- v1.readFields(originalIn);
+ int leasedDecompressorsBefore =
+ CodecPool.getLeasedDecompressorsCount(codec);
+ try (CompressionInputStream inflateFilter =
+ codec.createInputStream(deCompressedDataBuffer);
+ DataInputStream inflateIn =
+ new DataInputStream(new BufferedInputStream(inflateFilter))) {
+
+ // Check
+ originalData.reset(data.getData(), 0, data.getLength());
+ DataInputStream originalIn =
+ new DataInputStream(new BufferedInputStream(originalData));
+ for(int i=0; i < count; ++i) {
+ RandomDatum k1 = new RandomDatum();
+ RandomDatum v1 = new RandomDatum();
+ k1.readFields(originalIn);
+ v1.readFields(originalIn);
- RandomDatum k2 = new RandomDatum();
- RandomDatum v2 = new RandomDatum();
- k2.readFields(inflateIn);
- v2.readFields(inflateIn);
- assertTrue("original and compressed-then-decompressed-output not equal",
- k1.equals(k2) && v1.equals(v2));
+ RandomDatum k2 = new RandomDatum();
+ RandomDatum v2 = new RandomDatum();
+ k2.readFields(inflateIn);
+ v2.readFields(inflateIn);
+ assertTrue("original and compressed-then-decompressed-output not equal",
+ k1.equals(k2) && v1.equals(v2));
- // original and compressed-then-decompressed-output have the same hashCode
- Map<RandomDatum, String> m = new HashMap<RandomDatum, String>();
- m.put(k1, k1.toString());
- m.put(v1, v1.toString());
- String result = m.get(k2);
- assertEquals("k1 and k2 hashcode not equal", result, k1.toString());
- result = m.get(v2);
- assertEquals("v1 and v2 hashcode not equal", result, v1.toString());
+ // original and compressed-then-decompressed-output have the same
+ // hashCode
+ Map<RandomDatum, String> m = new HashMap<RandomDatum, String>();
+ m.put(k1, k1.toString());
+ m.put(v1, v1.toString());
+ String result = m.get(k2);
+ assertEquals("k1 and k2 hashcode not equal", result, k1.toString());
+ result = m.get(v2);
+ assertEquals("v1 and v2 hashcode not equal", result, v1.toString());
+ }
}
+ assertEquals("leased decompressor not returned to the codec pool",
+ leasedDecompressorsBefore,
+ CodecPool.getLeasedDecompressorsCount(codec));
// De-compress data byte-at-a-time
originalData.reset(data.getData(), 0, data.getLength());
deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
compressedDataBuffer.getLength());
- inflateFilter =
+ try (CompressionInputStream inflateFilter =
codec.createInputStream(deCompressedDataBuffer);
-
- // Check
- originalIn = new DataInputStream(new BufferedInputStream(originalData));
- int expected;
- do {
- expected = originalIn.read();
- assertEquals("Inflated stream read by byte does not match",
- expected, inflateFilter.read());
- } while (expected != -1);
+ DataInputStream originalIn =
+ new DataInputStream(new BufferedInputStream(originalData))) {
+
+ // Check
+ int expected;
+ do {
+ expected = originalIn.read();
+ assertEquals("Inflated stream read by byte does not match",
+ expected, inflateFilter.read());
+ } while (expected != -1);
+ }
LOG.info("SUCCESS! Completed checking " + count + " records");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org