You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2014/05/28 21:53:08 UTC

svn commit: r1598115 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-...

Author: jlowe
Date: Wed May 28 19:53:08 2014
New Revision: 1598115

URL: http://svn.apache.org/r1598115
Log:
svn merge -c 1598111 FIXES: MAPREDUCE-5862. Line records longer than 2x split size aren't handled correctly. Contributed by bc Wong

Added:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt
      - copied unchanged from r1598111, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt.bz2
      - copied unchanged from r1598111, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/recordSpanningMultipleSplits.txt.bz2
Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1598115&r1=1598114&r2=1598115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed May 28 19:53:08 2014
@@ -99,6 +99,9 @@ Release 2.5.0 - UNRELEASED
     MAPREDUCE-5309. 2.0.4 JobHistoryParser can't parse certain failed job
     history files generated by 2.0.3 history server (Rushabh S Shah via jlowe)
 
+    MAPREDUCE-5862. Line records longer than 2x split size aren't handled
+    correctly (bc Wong via jlowe)
+
 Release 2.4.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml?rev=1598115&r1=1598114&r2=1598115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml Wed May 28 19:53:08 2014
@@ -85,6 +85,15 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>src/test/resources/recordSpanningMultipleSplits.txt</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 </project>

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java?rev=1598115&r1=1598114&r2=1598115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java Wed May 28 19:53:08 2014
@@ -184,7 +184,7 @@ public class LineRecordReader implements
   private int maxBytesToConsume(long pos) {
     return isCompressedInput()
       ? Integer.MAX_VALUE
-      : (int) Math.min(Integer.MAX_VALUE, end - pos);
+      : (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength);
   }
 
   private long getFilePosition() throws IOException {
@@ -206,8 +206,7 @@ public class LineRecordReader implements
     while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
       key.set(pos);
 
-      int newSize = in.readLine(value, maxLineLength,
-          Math.max(maxBytesToConsume(pos), maxLineLength));
+      int newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
       if (newSize == 0) {
         return false;
       }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=1598115&r1=1598114&r2=1598115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Wed May 28 19:53:08 2014
@@ -121,7 +121,7 @@ public class LineRecordReader extends Re
   private int maxBytesToConsume(long pos) {
     return isCompressedInput
       ? Integer.MAX_VALUE
-      : (int) Math.min(Integer.MAX_VALUE, end - pos);
+      : (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength);
   }
 
   private long getFilePosition() throws IOException {
@@ -146,8 +146,7 @@ public class LineRecordReader extends Re
     // We always read one extra line, which lies outside the upper
     // split limit i.e. (end - 1)
     while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
-      newSize = in.readLine(value, maxLineLength,
-          Math.max(maxBytesToConsume(pos), maxLineLength));
+      newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
       pos += newSize;
       if (newSize < maxLineLength) {
         break;

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java?rev=1598115&r1=1598114&r2=1598115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java Wed May 28 19:53:08 2014
@@ -23,9 +23,12 @@ import static org.junit.Assert.assertNot
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.URL;
+import java.util.ArrayList;
 
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
@@ -97,4 +100,92 @@ public class TestLineRecordReader {
     // character is a linefeed
     testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498);
   }
+
+  // Use the LineRecordReader to read records from the file
+  public ArrayList<String> readRecords(URL testFileUrl, int splitSize)
+      throws IOException {
+
+    // Set up context
+    File testFile = new File(testFileUrl.getFile());
+    long testFileSize = testFile.length();
+    Path testFilePath = new Path(testFile.getAbsolutePath());
+    Configuration conf = new Configuration();
+    conf.setInt("io.file.buffer.size", 1);
+
+    // Gather the records returned by the record reader
+    ArrayList<String> records = new ArrayList<String>();
+
+    long offset = 0;
+    LongWritable key = new LongWritable();
+    Text value = new Text();
+    while (offset < testFileSize) {
+      FileSplit split =
+          new FileSplit(testFilePath, offset, splitSize, (String[]) null);
+      LineRecordReader reader = new LineRecordReader(conf, split);
+
+      while (reader.next(key, value)) {
+        records.add(value.toString());
+      }
+      offset += splitSize;
+    }
+    return records;
+  }
+
+  // Gather the records by just splitting on new lines
+  public String[] readRecordsDirectly(URL testFileUrl, boolean bzip)
+      throws IOException {
+    int MAX_DATA_SIZE = 1024 * 1024;
+    byte[] data = new byte[MAX_DATA_SIZE];
+    FileInputStream fis = new FileInputStream(testFileUrl.getFile());
+    int count;
+    if (bzip) {
+      BZip2CompressorInputStream bzIn = new BZip2CompressorInputStream(fis);
+      count = bzIn.read(data);
+      bzIn.close();
+    } else {
+      count = fis.read(data);
+    }
+    fis.close();
+    assertTrue("Test file data too big for buffer", count < data.length);
+    return new String(data, 0, count, "UTF-8").split("\n");
+  }
+
+  public void checkRecordSpanningMultipleSplits(String testFile,
+                                                int splitSize,
+                                                boolean bzip)
+      throws IOException {
+    URL testFileUrl = getClass().getClassLoader().getResource(testFile);
+    ArrayList<String> records = readRecords(testFileUrl, splitSize);
+    String[] actuals = readRecordsDirectly(testFileUrl, bzip);
+
+    assertEquals("Wrong number of records", actuals.length, records.size());
+
+    boolean hasLargeRecord = false;
+    for (int i = 0; i < actuals.length; ++i) {
+      assertEquals(actuals[i], records.get(i));
+      if (actuals[i].length() > 2 * splitSize) {
+        hasLargeRecord = true;
+      }
+    }
+
+    assertTrue("Invalid test data. Doesn't have a large enough record",
+               hasLargeRecord);
+  }
+
+  @Test
+  public void testRecordSpanningMultipleSplits()
+      throws IOException {
+    checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt",
+        10, false);
+  }
+
+  @Test
+  public void testRecordSpanningMultipleSplitsCompressed()
+      throws IOException {
+    // The file is generated with bz2 block size of 100k. The split size
+    // needs to be larger than that for the CompressedSplitLineReader to
+    // work.
+    checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2",
+        200 * 1000, true);
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java?rev=1598115&r1=1598114&r2=1598115&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java Wed May 28 19:53:08 2014
@@ -23,9 +23,12 @@ import static org.junit.Assert.assertNot
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.URL;
+import java.util.ArrayList;
 
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -101,4 +104,93 @@ public class TestLineRecordReader {
     // character is a linefeed
     testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498);
   }
+
+  // Use the LineRecordReader to read records from the file
+  public ArrayList<String> readRecords(URL testFileUrl, int splitSize)
+      throws IOException {
+
+    // Set up context
+    File testFile = new File(testFileUrl.getFile());
+    long testFileSize = testFile.length();
+    Path testFilePath = new Path(testFile.getAbsolutePath());
+    Configuration conf = new Configuration();
+    conf.setInt("io.file.buffer.size", 1);
+    TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
+
+    // Gather the records returned by the record reader
+    ArrayList<String> records = new ArrayList<String>();
+
+    long offset = 0;
+    while (offset < testFileSize) {
+      FileSplit split = new FileSplit(testFilePath, offset, splitSize, null);
+      LineRecordReader reader = new LineRecordReader();
+      reader.initialize(split, context);
+
+      while (reader.nextKeyValue()) {
+        records.add(reader.getCurrentValue().toString());
+      }
+      offset += splitSize;
+    }
+    return records;
+  }
+
+  // Gather the records by just splitting on new lines
+  public String[] readRecordsDirectly(URL testFileUrl, boolean bzip)
+      throws IOException {
+    int MAX_DATA_SIZE = 1024 * 1024;
+    byte[] data = new byte[MAX_DATA_SIZE];
+    FileInputStream fis = new FileInputStream(testFileUrl.getFile());
+    int count;
+    if (bzip) {
+      BZip2CompressorInputStream bzIn = new BZip2CompressorInputStream(fis);
+      count = bzIn.read(data);
+      bzIn.close();
+    } else {
+      count = fis.read(data);
+    }
+    fis.close();
+    assertTrue("Test file data too big for buffer", count < data.length);
+    return new String(data, 0, count, "UTF-8").split("\n");
+  }
+
+  public void checkRecordSpanningMultipleSplits(String testFile,
+                                                int splitSize,
+                                                boolean bzip)
+      throws IOException {
+    URL testFileUrl = getClass().getClassLoader().getResource(testFile);
+    ArrayList<String> records = readRecords(testFileUrl, splitSize);
+    String[] actuals = readRecordsDirectly(testFileUrl, bzip);
+
+    assertEquals("Wrong number of records", actuals.length, records.size());
+
+    boolean hasLargeRecord = false;
+    for (int i = 0; i < actuals.length; ++i) {
+      assertEquals(actuals[i], records.get(i));
+      if (actuals[i].length() > 2 * splitSize) {
+        hasLargeRecord = true;
+      }
+    }
+
+    assertTrue("Invalid test data. Doesn't have a large enough record",
+               hasLargeRecord);
+  }
+
+  @Test
+  public void testRecordSpanningMultipleSplits()
+      throws IOException {
+    checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt",
+                                      10,
+                                      false);
+  }
+
+  @Test
+  public void testRecordSpanningMultipleSplitsCompressed()
+      throws IOException {
+    // The file is generated with bz2 block size of 100k. The split size
+    // needs to be larger than that for the CompressedSplitLineReader to
+    // work.
+    checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2",
+                                      200 * 1000,
+                                      true);
+  }
 }