You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2013/12/10 00:42:37 UTC

svn commit: r1549710 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/...

Author: jlowe
Date: Mon Dec  9 23:42:36 2013
New Revision: 1549710

URL: http://svn.apache.org/r1549710
Log:
svn merge -c 1549705 FIXES: MAPREDUCE-5656. bzip2 codec can drop records when reading data in splits. Contributed by Jason Lowe

Added:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java
      - copied unchanged from r1549705, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CompressedSplitLineReader.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/SplitLineReader.java
      - copied unchanged from r1549705, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/SplitLineReader.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
      - copied unchanged from r1549705, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
      - copied unchanged from r1549705, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/blockEndingInCR.txt.bz2
      - copied unchanged from r1549705, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/blockEndingInCR.txt.bz2
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/blockEndingInCRThenLF.txt.bz2
      - copied unchanged from r1549705, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/blockEndingInCRThenLF.txt.bz2
Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1549710&r1=1549709&r2=1549710&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Mon Dec  9 23:42:36 2013
@@ -94,6 +94,9 @@ Release 2.4.0 - UNRELEASED
 
     MAPREDUCE-5632. TestRMContainerAllocator#testUpdatedNodes fails (jeagles)
 
+    MAPREDUCE-5656. bzip2 codec can drop records when reading data in splits
+    (jlowe)
+
 Release 2.3.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java?rev=1549710&r1=1549709&r2=1549710&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java Mon Dec  9 23:42:36 2013
@@ -36,6 +36,8 @@ import org.apache.hadoop.io.compress.Com
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.io.compress.SplitCompressionInputStream;
 import org.apache.hadoop.io.compress.SplittableCompressionCodec;
+import org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader;
+import org.apache.hadoop.mapreduce.lib.input.SplitLineReader;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
 
@@ -52,7 +54,7 @@ public class LineRecordReader implements
   private long start;
   private long pos;
   private long end;
-  private LineReader in;
+  private SplitLineReader in;
   private FSDataInputStream fileIn;
   private final Seekable filePosition;
   int maxLineLength;
@@ -111,17 +113,18 @@ public class LineRecordReader implements
           ((SplittableCompressionCodec)codec).createInputStream(
             fileIn, decompressor, start, end,
             SplittableCompressionCodec.READ_MODE.BYBLOCK);
-        in = new LineReader(cIn, job, recordDelimiter);
+        in = new CompressedSplitLineReader(cIn, job, recordDelimiter);
         start = cIn.getAdjustedStart();
         end = cIn.getAdjustedEnd();
         filePosition = cIn; // take pos from compressed stream
       } else {
-        in = new LineReader(codec.createInputStream(fileIn, decompressor), job, recordDelimiter);
+        in = new SplitLineReader(codec.createInputStream(fileIn,
+            decompressor), job, recordDelimiter);
         filePosition = fileIn;
       }
     } else {
       fileIn.seek(start);
-      in = new LineReader(fileIn, job, recordDelimiter);
+      in = new SplitLineReader(fileIn, job, recordDelimiter);
       filePosition = fileIn;
     }
     // If this is not the first split, we always throw away first record
@@ -141,7 +144,7 @@ public class LineRecordReader implements
   public LineRecordReader(InputStream in, long offset, long endOffset,
       int maxLineLength, byte[] recordDelimiter) {
     this.maxLineLength = maxLineLength;
-    this.in = new LineReader(in, recordDelimiter);
+    this.in = new SplitLineReader(in, recordDelimiter);
     this.start = offset;
     this.pos = offset;
     this.end = endOffset;    
@@ -159,7 +162,7 @@ public class LineRecordReader implements
     throws IOException{
     this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
       LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
-    this.in = new LineReader(in, job, recordDelimiter);
+    this.in = new SplitLineReader(in, job, recordDelimiter);
     this.start = offset;
     this.pos = offset;
     this.end = endOffset;    
@@ -200,7 +203,7 @@ public class LineRecordReader implements
 
     // We always read one extra line, which lies outside the upper
     // split limit i.e. (end - 1)
-    while (getFilePosition() <= end) {
+    while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
       key.set(pos);
 
       int newSize = in.readLine(value, maxLineLength,

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=1549710&r1=1549709&r2=1549710&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Mon Dec  9 23:42:36 2013
@@ -38,7 +38,6 @@ import org.apache.hadoop.io.compress.Dec
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.LineReader;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
 
@@ -55,7 +54,7 @@ public class LineRecordReader extends Re
   private long start;
   private long pos;
   private long end;
-  private LineReader in;
+  private SplitLineReader in;
   private FSDataInputStream fileIn;
   private Seekable filePosition;
   private int maxLineLength;
@@ -94,33 +93,19 @@ public class LineRecordReader extends Re
           ((SplittableCompressionCodec)codec).createInputStream(
             fileIn, decompressor, start, end,
             SplittableCompressionCodec.READ_MODE.BYBLOCK);
-        if (null == this.recordDelimiterBytes){
-          in = new LineReader(cIn, job);
-        } else {
-          in = new LineReader(cIn, job, this.recordDelimiterBytes);
-        }
-
+        in = new CompressedSplitLineReader(cIn, job,
+            this.recordDelimiterBytes);
         start = cIn.getAdjustedStart();
         end = cIn.getAdjustedEnd();
         filePosition = cIn;
       } else {
-        if (null == this.recordDelimiterBytes) {
-          in = new LineReader(codec.createInputStream(fileIn, decompressor),
-              job);
-        } else {
-          in = new LineReader(codec.createInputStream(fileIn,
-              decompressor), job, this.recordDelimiterBytes);
-        }
+        in = new SplitLineReader(codec.createInputStream(fileIn,
+            decompressor), job, this.recordDelimiterBytes);
         filePosition = fileIn;
       }
     } else {
       fileIn.seek(start);
-      if (null == this.recordDelimiterBytes){
-        in = new LineReader(fileIn, job);
-      } else {
-        in = new LineReader(fileIn, job, this.recordDelimiterBytes);
-      }
-
+      in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);
       filePosition = fileIn;
     }
     // If this is not the first split, we always throw away first record
@@ -160,7 +145,7 @@ public class LineRecordReader extends Re
     int newSize = 0;
     // We always read one extra line, which lies outside the upper
     // split limit i.e. (end - 1)
-    while (getFilePosition() <= end) {
+    while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
       newSize = in.readLine(value, maxLineLength,
           Math.max(maxBytesToConsume(pos), maxLineLength));
       pos += newSize;