You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by mk...@apache.org on 2014/06/27 03:25:18 UTC

git commit: CRUNCH-429: Fix CSVInputFormat

Repository: crunch
Updated Branches:
  refs/heads/master 8027f706a -> ace392bde


CRUNCH-429: Fix CSVInputFormat

Signed-off-by: Micah Whitacre <mk...@apache.org>

CRUNCH-429: Fixed spelling error, handled potential NPE, and moved FileSystem retrieval outside of for loop.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ace392bd
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ace392bd
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ace392bd

Branch: refs/heads/master
Commit: ace392bde724273b1d300f7b01c4648e9e1b9060
Parents: 8027f70
Author: Mac Champion <ma...@cerner.com>
Authored: Tue Jun 24 12:30:34 2014 -0500
Committer: Micah Whitacre <mk...@apache.org>
Committed: Thu Jun 26 16:12:25 2014 -0500

----------------------------------------------------------------------
 .../crunch/io/text/csv/CSVFileSourceIT.java     | 66 ++++++++++++--------
 .../crunch/io/text/csv/CSVInputFormat.java      | 47 ++++++++------
 2 files changed, 66 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/ace392bd/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java b/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
index a81f78d..ba8e193 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
@@ -17,19 +17,18 @@
  */
 package org.apache.crunch.io.text.csv;
 
-import java.io.File;
-import java.io.FileInputStream;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.util.Collection;
-import static org.junit.Assert.*;
 
 import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -39,14 +38,30 @@ public class CSVFileSourceIT {
 
   @Test
   public void testVanillaCSV() throws Exception {
-    String[] expectedFileContents = { "1,2,3,4", "5,6,7,8", "9,10,11", "12,13,14" };
+    final String[] expectedFileContents = { "1,2,3,4", "5,6,7,8", "9,10,11", "12,13,14" };
+
+    final String vanillaCSVFile = tmpDir.copyResourceFileName("vanilla.csv");
+    final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
+    final PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(vanillaCSVFile)));
+
+    final Collection<String> csvLinesList = csvLines.asCollection().getValue();
+
+    for (int i = 0; i < expectedFileContents.length; i++) {
+      assertTrue(csvLinesList.contains(expectedFileContents[i]));
+    }
+  }
+
+  @Test
+  public void testVanillaCSVWithAdditionalActions() throws Exception {
+    final String[] expectedFileContents = { "1,2,3,4", "5,6,7,8", "9,10,11", "12,13,14" };
 
-    String vanillaCSVFile = tmpDir.copyResourceFileName("vanilla.csv");
-    Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
-    PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(vanillaCSVFile)));
-    pipeline.run();
+    final String vanillaCSVFile = tmpDir.copyResourceFileName("vanilla.csv");
+    final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
+    final PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(vanillaCSVFile)));
 
-    Collection<String> csvLinesList = csvLines.asCollection().getValue();
+    final PTable<String, Long> countTable = csvLines.count();
+    final PCollection<String> csvLines2 = countTable.keys();
+    final Collection<String> csvLinesList = csvLines2.asCollection().getValue();
 
     for (int i = 0; i < expectedFileContents.length; i++) {
       assertTrue(csvLinesList.contains(expectedFileContents[i]));
@@ -55,16 +70,15 @@ public class CSVFileSourceIT {
 
   @Test
   public void testCSVWithNewlines() throws Exception {
-    String[] expectedFileContents = {
+    final String[] expectedFileContents = {
         "\"Champion, Mac\",\"1234 Hoth St.\n\tApartment 101\n\tAtlanta, GA\n\t64086\",\"30\",\"M\",\"5/28/2010 12:00:00 AM\",\"Just some guy\"",
         "\"Champion, Mac\",\"5678 Tatooine Rd. Apt 5, Mobile, AL 36608\",\"30\",\"M\",\"Some other date\",\"short description\"" };
 
-    String csvWithNewlines = tmpDir.copyResourceFileName("withNewlines.csv");
-    Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
-    PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(csvWithNewlines)));
-    pipeline.run();
+    final String csvWithNewlines = tmpDir.copyResourceFileName("withNewlines.csv");
+    final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
+    final PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(csvWithNewlines)));
 
-    Collection<String> csvLinesList = csvLines.asCollection().getValue();
+    final Collection<String> csvLinesList = csvLines.asCollection().getValue();
 
     for (int i = 0; i < expectedFileContents.length; i++) {
       assertTrue(csvLinesList.contains(expectedFileContents[i]));
@@ -77,18 +91,17 @@ public class CSVFileSourceIT {
    */
   @Test
   public void testCSVWithCustomQuoteAndNewlines() throws IOException {
-    String[] expectedFileContents = {
+    final String[] expectedFileContents = {
         "*Champion, Mac*,*1234 Hoth St.\n\tApartment 101\n\tAtlanta, GA\n\t64086*,*30*,*M*,*5/28/2010 12:00:00 AM*,*Just some guy*",
         "*Mac, Champion*,*5678 Tatooine Rd. Apt 5, Mobile, AL 36608*,*30*,*M*,*Some other date*,*short description*" };
 
-    String csvWithNewlines = tmpDir.copyResourceFileName("customQuoteCharWithNewlines.csv");
-    Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
-    PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(csvWithNewlines),
+    final String csvWithNewlines = tmpDir.copyResourceFileName("customQuoteCharWithNewlines.csv");
+    final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
+    final PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(csvWithNewlines),
         CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '*', '*',
         CSVLineReader.DEFAULT_ESCAPE_CHARACTER));
-    pipeline.run();
 
-    Collection<String> csvLinesList = csvLines.asCollection().getValue();
+    final Collection<String> csvLinesList = csvLines.asCollection().getValue();
 
     for (int i = 0; i < expectedFileContents.length; i++) {
       assertTrue(csvLinesList.contains(expectedFileContents[i]));
@@ -105,13 +118,12 @@ public class CSVFileSourceIT {
   public void testBrokenLineParsingInChinese() throws IOException {
     final String[] expectedChineseLines = { "您好我叫马克,我从亚拉巴马州来,我是软件工程师,我二十八岁", "我有一个宠物,它是一个小猫,它六岁,它很漂亮",
         "我喜欢吃饭,“我觉得这个饭最好\n*蛋糕\n*包子\n*冰淇淋\n*啤酒“,他们都很好,我也很喜欢奶酪但它是不健康的", "我是男的,我的头发很短,我穿蓝色的裤子,“我穿黑色的、“衣服”" };
-    String chineseLines = tmpDir.copyResourceFileName("brokenChineseLines.csv");
+    final String chineseLines = tmpDir.copyResourceFileName("brokenChineseLines.csv");
 
-    Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
-    PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(chineseLines),
+    final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
+    final PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(chineseLines),
         CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '“', '”', '、'));
-    pipeline.run();
-    Collection<String> csvLinesList = csvLines.asCollection().getValue();
+    final Collection<String> csvLinesList = csvLines.asCollection().getValue();
     for (int i = 0; i < expectedChineseLines.length; i++) {
       assertTrue(csvLinesList.contains(expectedChineseLines[i]));
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/ace392bd/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
index 5d5abe3..867b704 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -28,7 +30,6 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -43,12 +44,13 @@ import com.google.common.annotations.VisibleForTesting;
  * format deals with the fact that CSV files can potentially have multiple lines
  * within fields which should all be treated as one record.
  */
-public class CSVInputFormat extends FileInputFormat<LongWritable, Text> {
+public class CSVInputFormat extends FileInputFormat<LongWritable, Text> implements Configurable {
   private int bufferSize;
   private String inputFileEncoding;
   private char openQuoteChar;
   private char closeQuoteChar;
   private char escapeChar;
+  private Configuration configuration;
 
   /**
    * This method is used by crunch to get an instance of {@link CSVRecordReader}
@@ -57,10 +59,7 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> {
    *          the {@link InputSplit} that will be assigned to the record reader
    * @param context
    *          the {@TaskAttemptContext} for the job
-   * @return an instance of {@link CSVRecordReader} created using
-   *         {@link CSVInputFormat#getSeparatorChar()},
-   *         {@link CSVInputFormat#getQuoteChar()}, and
-   *         {@link CSVInputFormat#getEscapeChar()}.
+   * @return an instance of {@link CSVRecordReader} created using configured separator, quote, and escape characters.
    */
   @Override
   public RecordReader<LongWritable, Text> createRecordReader(final InputSplit split, final TaskAttemptContext context) {
@@ -83,17 +82,18 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> {
     final long splitSize = job.getConfiguration().getLong("csv.input.split.size", 67108864);
     final List<InputSplit> splits = new ArrayList<InputSplit>();
     final Path[] paths = FileUtil.stat2Paths(listStatus(job).toArray(new FileStatus[0]));
-    FileSystem fileSystem = null;
+    FileSystem fileSystem = FileSystem.get(job.getConfiguration());
     FSDataInputStream inputStream = null;
     try {
       for (final Path path : paths) {
-        fileSystem = path.getFileSystem(job.getConfiguration());
         inputStream = fileSystem.open(path);
         splits.addAll(getSplitsForFile(splitSize, fileSystem.getFileStatus(path).getLen(), path, inputStream));
       }
       return splits;
     } finally {
-      inputStream.close();
+      if(inputStream != null) {
+        inputStream.close();
+      }
     }
   }
 
@@ -166,44 +166,51 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> {
     return splitsList;
   }
 
+  @Override
+  public Configuration getConf() {
+    return configuration;
+  }
+
+  @Override
+  public void setConf(final Configuration conf) {
+    configuration = conf;
+    configure();
+  }
+
   /**
-   * This method will read the configuration that is set in
+   * This method will read the configuration options that were set in
    * {@link CSVFileSource}'s private getBundle() method
-   * 
-   * @param jobConf
-   *          The {@code JobConf} instance from which the CSV configuration
-   *          parameters will be read, if necessary.
    */
-  public void configure(JobConf jobConf) {
-    String bufferValue = jobConf.get(CSVFileSource.CSV_BUFFER_SIZE);
+  public void configure() {
+    final String bufferValue = this.configuration.get(CSVFileSource.CSV_BUFFER_SIZE);
     if ("".equals(bufferValue)) {
       bufferSize = CSVLineReader.DEFAULT_BUFFER_SIZE;
     } else {
       bufferSize = Integer.parseInt(bufferValue);
     }
 
-    String inputFileEncodingValue = jobConf.get(CSVFileSource.CSV_INPUT_FILE_ENCODING);
+    final String inputFileEncodingValue = this.configuration.get(CSVFileSource.CSV_INPUT_FILE_ENCODING);
     if ("".equals(inputFileEncodingValue)) {
       inputFileEncoding = CSVLineReader.DEFAULT_INPUT_FILE_ENCODING;
     } else {
       inputFileEncoding = inputFileEncodingValue;
     }
 
-    String openQuoteCharValue = jobConf.get(CSVFileSource.CSV_OPEN_QUOTE_CHAR);
+    final String openQuoteCharValue = this.configuration.get(CSVFileSource.CSV_OPEN_QUOTE_CHAR);
     if ("".equals(openQuoteCharValue)) {
       openQuoteChar = CSVLineReader.DEFAULT_QUOTE_CHARACTER;
     } else {
       openQuoteChar = openQuoteCharValue.charAt(0);
     }
 
-    String closeQuoteCharValue = jobConf.get(CSVFileSource.CSV_CLOSE_QUOTE_CHAR);
+    final String closeQuoteCharValue = this.configuration.get(CSVFileSource.CSV_CLOSE_QUOTE_CHAR);
     if ("".equals(closeQuoteCharValue)) {
       closeQuoteChar = CSVLineReader.DEFAULT_QUOTE_CHARACTER;
     } else {
       closeQuoteChar = closeQuoteCharValue.charAt(0);
     }
 
-    String escapeCharValue = jobConf.get(CSVFileSource.CSV_ESCAPE_CHAR);
+    final String escapeCharValue = this.configuration.get(CSVFileSource.CSV_ESCAPE_CHAR);
     if ("".equals(escapeCharValue)) {
       escapeChar = CSVLineReader.DEFAULT_ESCAPE_CHARACTER;
     } else {