You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2014/11/14 12:46:07 UTC

hadoop git commit: MAPREDUCE-5918. LineRecordReader can return the same decompressor to CodecPool multiple times (Sergey Murylev via raviprak)

Repository: hadoop
Updated Branches:
  refs/heads/trunk d005404ef -> 1a1dcce82


MAPREDUCE-5918. LineRecordReader can return the same decompressor to CodecPool multiple times (Sergey Murylev via raviprak)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1a1dcce8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1a1dcce8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1a1dcce8

Branch: refs/heads/trunk
Commit: 1a1dcce827d8747a5629151afa335598fbc94f9e
Parents: d005404
Author: Ravi Prakash <ra...@apache.org>
Authored: Fri Nov 14 03:45:53 2014 -0800
Committer: Ravi Prakash <ra...@apache.org>
Committed: Fri Nov 14 03:45:53 2014 -0800

----------------------------------------------------------------------
 .../apache/hadoop/io/compress/CodecPool.java    | 54 +++++++++++---------
 .../hadoop/io/compress/TestCodecPool.java       | 47 ++++++++++++++---
 hadoop-mapreduce-project/CHANGES.txt            |  3 ++
 .../apache/hadoop/mapred/LineRecordReader.java  |  1 +
 .../mapreduce/lib/input/LineRecordReader.java   |  1 +
 .../hadoop/mapred/TestLineRecordReader.java     | 37 ++++++++++++++
 .../lib/input/TestLineRecordReader.java         | 38 ++++++++++++++
 7 files changed, 150 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a1dcce8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
index 11d88f1..bb566de 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
@@ -17,9 +17,9 @@
  */
 package org.apache.hadoop.io.compress;
 
-import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.HashMap;
-import java.util.List;
+import java.util.Set;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -47,15 +47,15 @@ public class CodecPool {
    * A global compressor pool used to save the expensive 
    * construction/destruction of (possibly native) decompression codecs.
    */
-  private static final Map<Class<Compressor>, List<Compressor>> compressorPool = 
-    new HashMap<Class<Compressor>, List<Compressor>>();
+  private static final Map<Class<Compressor>, Set<Compressor>> compressorPool =
+    new HashMap<Class<Compressor>, Set<Compressor>>();
   
   /**
    * A global decompressor pool used to save the expensive 
    * construction/destruction of (possibly native) decompression codecs.
    */
-  private static final Map<Class<Decompressor>, List<Decompressor>> decompressorPool = 
-    new HashMap<Class<Decompressor>, List<Decompressor>>();
+  private static final Map<Class<Decompressor>, Set<Decompressor>> decompressorPool =
+    new HashMap<Class<Decompressor>, Set<Decompressor>>();
 
   private static <T> LoadingCache<Class<T>, AtomicInteger> createCache(
       Class<T> klass) {
@@ -80,20 +80,21 @@ public class CodecPool {
   private static final LoadingCache<Class<Decompressor>, AtomicInteger> decompressorCounts =
       createCache(Decompressor.class);
 
-  private static <T> T borrow(Map<Class<T>, List<T>> pool,
+  private static <T> T borrow(Map<Class<T>, Set<T>> pool,
                              Class<? extends T> codecClass) {
     T codec = null;
     
     // Check if an appropriate codec is available
-    List<T> codecList;
+    Set<T> codecSet;
     synchronized (pool) {
-      codecList = pool.get(codecClass);
+      codecSet = pool.get(codecClass);
     }
 
-    if (codecList != null) {
-      synchronized (codecList) {
-        if (!codecList.isEmpty()) {
-          codec = codecList.remove(codecList.size() - 1);
+    if (codecSet != null) {
+      synchronized (codecSet) {
+        if (!codecSet.isEmpty()) {
+          codec = codecSet.iterator().next();
+          codecSet.remove(codec);
         }
       }
     }
@@ -101,22 +102,23 @@ public class CodecPool {
     return codec;
   }
 
-  private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
+  private static <T> boolean payback(Map<Class<T>, Set<T>> pool, T codec) {
     if (codec != null) {
       Class<T> codecClass = ReflectionUtils.getClass(codec);
-      List<T> codecList;
+      Set<T> codecSet;
       synchronized (pool) {
-        codecList = pool.get(codecClass);
-        if (codecList == null) {
-          codecList = new ArrayList<T>();
-          pool.put(codecClass, codecList);
+        codecSet = pool.get(codecClass);
+        if (codecSet == null) {
+          codecSet = new HashSet<T>();
+          pool.put(codecClass, codecSet);
         }
       }
 
-      synchronized (codecList) {
-        codecList.add(codec);
+      synchronized (codecSet) {
+        return codecSet.add(codec);
       }
     }
+    return false;
   }
   
   @SuppressWarnings("unchecked")
@@ -200,8 +202,9 @@ public class CodecPool {
       return;
     }
     compressor.reset();
-    payback(compressorPool, compressor);
-    updateLeaseCount(compressorCounts, compressor, -1);
+    if (payback(compressorPool, compressor)) {
+      updateLeaseCount(compressorCounts, compressor, -1);
+    }
   }
   
   /**
@@ -219,8 +222,9 @@ public class CodecPool {
       return;
     }
     decompressor.reset();
-    payback(decompressorPool, decompressor);
-    updateLeaseCount(decompressorCounts, decompressor, -1);
+    if (payback(decompressorPool, decompressor)) {
+      updateLeaseCount(decompressorCounts, decompressor, -1);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a1dcce8/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java
index 5cacebf..c889a59 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java
@@ -19,15 +19,9 @@ package org.apache.hadoop.io.compress;
 
 import static org.junit.Assert.assertEquals;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 
@@ -35,6 +29,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.HashSet;
+import java.util.Set;
+
 public class TestCodecPool {
   private final String LEASE_COUNT_ERR =
       "Incorrect number of leased (de)compressors";
@@ -61,6 +58,25 @@ public class TestCodecPool {
     CodecPool.returnCompressor(comp1);
     assertEquals(LEASE_COUNT_ERR, 0,
         CodecPool.getLeasedCompressorsCount(codec));
+
+    CodecPool.returnCompressor(comp1);
+    assertEquals(LEASE_COUNT_ERR, 0,
+        CodecPool.getLeasedCompressorsCount(codec));
+  }
+
+  @Test(timeout = 1000)
+  public void testCompressorNotReturnSameInstance() {
+    Compressor comp = CodecPool.getCompressor(codec);
+    CodecPool.returnCompressor(comp);
+    CodecPool.returnCompressor(comp);
+    Set<Compressor> compressors = new HashSet<Compressor>();
+    for (int i = 0; i < 10; ++i) {
+      compressors.add(CodecPool.getCompressor(codec));
+    }
+    assertEquals(10, compressors.size());
+    for (Compressor compressor : compressors) {
+      CodecPool.returnCompressor(compressor);
+    }
   }
 
   @Test(timeout = 1000)
@@ -78,6 +94,10 @@ public class TestCodecPool {
     CodecPool.returnDecompressor(decomp1);
     assertEquals(LEASE_COUNT_ERR, 0,
         CodecPool.getLeasedDecompressorsCount(codec));
+
+    CodecPool.returnDecompressor(decomp1);
+    assertEquals(LEASE_COUNT_ERR, 0,
+        CodecPool.getLeasedCompressorsCount(codec));
   }
 
   @Test(timeout = 1000)
@@ -154,4 +174,19 @@ public class TestCodecPool {
     assertEquals(LEASE_COUNT_ERR, 0,
         CodecPool.getLeasedDecompressorsCount(codec));
   }
+
+  @Test(timeout = 1000)
+  public void testDecompressorNotReturnSameInstance() {
+    Decompressor decomp = CodecPool.getDecompressor(codec);
+    CodecPool.returnDecompressor(decomp);
+    CodecPool.returnDecompressor(decomp);
+    Set<Decompressor> decompressors = new HashSet<Decompressor>();
+    for (int i = 0; i < 10; ++i) {
+      decompressors.add(CodecPool.getDecompressor(codec));
+    }
+    assertEquals(10, decompressors.size());
+    for (Decompressor decompressor : decompressors) {
+      CodecPool.returnDecompressor(decompressor);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a1dcce8/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index a8ca13f..28595ca 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -236,6 +236,9 @@ Release 2.7.0 - UNRELEASED
 
   BUG FIXES
 
+  MAPREDUCE-5918. LineRecordReader can return the same decompressor to CodecPool
+  multiple times (Sergey Murylev via raviprak)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a1dcce8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
index 6b5c26e..ba075e5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
@@ -284,6 +284,7 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
     } finally {
       if (decompressor != null) {
         CodecPool.returnDecompressor(decompressor);
+        decompressor = null;
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a1dcce8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
index 880a1a2..42e94ad 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
@@ -232,6 +232,7 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
     } finally {
       if (decompressor != null) {
         CodecPool.returnDecompressor(decompressor);
+        decompressor = null;
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a1dcce8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
index 7b664e9..a7a87c9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
@@ -27,12 +27,17 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.Decompressor;
 import org.junit.Test;
 
 public class TestLineRecordReader {
@@ -225,4 +230,36 @@ public class TestLineRecordReader {
 
     assertTrue("BOM is not skipped", skipBOM);
   }
+
+  @Test
+  public void testMultipleClose() throws IOException {
+    URL testFileUrl = getClass().getClassLoader().
+        getResource("recordSpanningMultipleSplits.txt.bz2");
+    assertNotNull("Cannot find recordSpanningMultipleSplits.txt.bz2",
+        testFileUrl);
+    File testFile = new File(testFileUrl.getFile());
+    Path testFilePath = new Path(testFile.getAbsolutePath());
+    long testFileSize = testFile.length();
+    Configuration conf = new Configuration();
+    conf.setInt(org.apache.hadoop.mapreduce.lib.input.
+        LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
+    FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
+        (String[])null);
+
+    LineRecordReader reader = new LineRecordReader(conf, split);
+    LongWritable key = new LongWritable();
+    Text value = new Text();
+    //noinspection StatementWithEmptyBody
+    while (reader.next(key, value)) ;
+    reader.close();
+    reader.close();
+
+    BZip2Codec codec = new BZip2Codec();
+    codec.setConf(conf);
+    Set<Decompressor> decompressors = new HashSet<Decompressor>();
+    for (int i = 0; i < 10; ++i) {
+      decompressors.add(CodecPool.getDecompressor(codec));
+    }
+    assertEquals(10, decompressors.size());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1a1dcce8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
index a1b5147..52fdc09 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
@@ -27,10 +27,15 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
@@ -231,4 +236,37 @@ public class TestLineRecordReader {
 
     assertTrue("BOM is not skipped", skipBOM);
   }
+
+  @Test
+  public void testMultipleClose() throws IOException {
+    URL testFileUrl = getClass().getClassLoader().
+        getResource("recordSpanningMultipleSplits.txt.bz2");
+    assertNotNull("Cannot find recordSpanningMultipleSplits.txt.bz2",
+        testFileUrl);
+    File testFile = new File(testFileUrl.getFile());
+    Path testFilePath = new Path(testFile.getAbsolutePath());
+    long testFileSize = testFile.length();
+    Configuration conf = new Configuration();
+    conf.setInt(org.apache.hadoop.mapreduce.lib.input.
+        LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
+    TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
+
+    // read the data and check whether BOM is skipped
+    FileSplit split = new FileSplit(testFilePath, 0, testFileSize, null);
+    LineRecordReader reader = new LineRecordReader();
+    reader.initialize(split, context);
+
+    //noinspection StatementWithEmptyBody
+    while (reader.nextKeyValue()) ;
+    reader.close();
+    reader.close();
+
+    BZip2Codec codec = new BZip2Codec();
+    codec.setConf(conf);
+    Set<Decompressor> decompressors = new HashSet<Decompressor>();
+    for (int i = 0; i < 10; ++i) {
+      decompressors.add(CodecPool.getDecompressor(codec));
+    }
+    assertEquals(10, decompressors.size());
+  }
 }