You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by mk...@apache.org on 2014/05/21 19:53:12 UTC

git commit: CRUNCH-362: Add CSVFileSource

Repository: crunch
Updated Branches:
  refs/heads/master 657cdd6b3 -> ce79a7a1b


CRUNCH-362: Add CSVFileSource

Signed-off-by: Micah Whitacre <mk...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ce79a7a1
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ce79a7a1
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ce79a7a1

Branch: refs/heads/master
Commit: ce79a7a1be6cc4670d4fcf58184028a0663ef139
Parents: 657cdd6
Author: Mac Champion <gi...@gmail.com>
Authored: Wed Mar 26 16:22:33 2014 -0500
Committer: Micah Whitacre <mk...@apache.org>
Committed: Wed May 21 12:50:46 2014 -0500

----------------------------------------------------------------------
 .../crunch/io/text/csv/CSVFileSourceIT.java     | 119 ++++++
 .../io/text/csv/CSVFileReaderFactory.java       |  94 +++++
 .../crunch/io/text/csv/CSVFileSource.java       | 207 ++++++++++
 .../crunch/io/text/csv/CSVInputFormat.java      | 213 ++++++++++
 .../crunch/io/text/csv/CSVLineReader.java       | 406 +++++++++++++++++++
 .../crunch/io/text/csv/CSVReadableData.java     |  75 ++++
 .../crunch/io/text/csv/CSVRecordIterator.java   | 119 ++++++
 .../crunch/io/text/csv/CSVRecordReader.java     | 202 +++++++++
 .../crunch/io/text/csv/CSVLineReaderTest.java   |  78 ++++
 .../io/text/csv/CSVRecordIteratorTest.java      |  75 ++++
 crunch-test/src/main/resources/UTF8.csv         |   1 +
 .../src/main/resources/brokenChineseLines.csv   |   8 +
 .../resources/customQuoteCharWithNewlines.csv   |   5 +
 crunch-test/src/main/resources/vanilla.csv      |   4 +
 crunch-test/src/main/resources/withNewlines.csv |   5 +
 pom.xml                                         |   1 +
 16 files changed, 1612 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java b/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
new file mode 100644
index 0000000..a81f78d
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text.csv;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Collection;
+import static org.junit.Assert.*;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class CSVFileSourceIT {
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testVanillaCSV() throws Exception {
+    String[] expectedFileContents = { "1,2,3,4", "5,6,7,8", "9,10,11", "12,13,14" };
+
+    String vanillaCSVFile = tmpDir.copyResourceFileName("vanilla.csv");
+    Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(vanillaCSVFile)));
+    pipeline.run();
+
+    Collection<String> csvLinesList = csvLines.asCollection().getValue();
+
+    for (int i = 0; i < expectedFileContents.length; i++) {
+      assertTrue(csvLinesList.contains(expectedFileContents[i]));
+    }
+  }
+
+  @Test
+  public void testCSVWithNewlines() throws Exception {
+    String[] expectedFileContents = {
+        "\"Champion, Mac\",\"1234 Hoth St.\n\tApartment 101\n\tAtlanta, GA\n\t64086\",\"30\",\"M\",\"5/28/2010 12:00:00 AM\",\"Just some guy\"",
+        "\"Champion, Mac\",\"5678 Tatooine Rd. Apt 5, Mobile, AL 36608\",\"30\",\"M\",\"Some other date\",\"short description\"" };
+
+    String csvWithNewlines = tmpDir.copyResourceFileName("withNewlines.csv");
+    Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(csvWithNewlines)));
+    pipeline.run();
+
+    Collection<String> csvLinesList = csvLines.asCollection().getValue();
+
+    for (int i = 0; i < expectedFileContents.length; i++) {
+      assertTrue(csvLinesList.contains(expectedFileContents[i]));
+    }
+  }
+
+  /**
+   * This test is to make sure that custom char values set in the FileSource are
+   * successfully picked up and used later by the InputFormat.
+   */
+  @Test
+  public void testCSVWithCustomQuoteAndNewlines() throws IOException {
+    String[] expectedFileContents = {
+        "*Champion, Mac*,*1234 Hoth St.\n\tApartment 101\n\tAtlanta, GA\n\t64086*,*30*,*M*,*5/28/2010 12:00:00 AM*,*Just some guy*",
+        "*Mac, Champion*,*5678 Tatooine Rd. Apt 5, Mobile, AL 36608*,*30*,*M*,*Some other date*,*short description*" };
+
+    String csvWithNewlines = tmpDir.copyResourceFileName("customQuoteCharWithNewlines.csv");
+    Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(csvWithNewlines),
+        CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '*', '*',
+        CSVLineReader.DEFAULT_ESCAPE_CHARACTER));
+    pipeline.run();
+
+    Collection<String> csvLinesList = csvLines.asCollection().getValue();
+
+    for (int i = 0; i < expectedFileContents.length; i++) {
+      assertTrue(csvLinesList.contains(expectedFileContents[i]));
+    }
+  }
+
+  /**
+   * This is effectively a mirror the above address tests, but using Chinese
+   * characters, even for the quotation marks and escape characters.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testBrokenLineParsingInChinese() throws IOException {
+    final String[] expectedChineseLines = { "您好我叫马克,我从亚拉巴马州来,我是软件工程师,我二十八岁", "我有一个宠物,它是一个小猫,它六岁,它很漂亮",
+        "我喜欢吃饭,“我觉得这个饭最好\n*蛋糕\n*包子\n*冰淇淋\n*啤酒“,他们都很好,我也很喜欢奶酪但它是不健康的", "我是男的,我的头发很短,我穿蓝色的裤子,“我穿黑色的、“衣服”" };
+    String chineseLines = tmpDir.copyResourceFileName("brokenChineseLines.csv");
+
+    Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(chineseLines),
+        CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '“', '”', '、'));
+    pipeline.run();
+    Collection<String> csvLinesList = csvLines.asCollection().getValue();
+    for (int i = 0; i < expectedChineseLines.length; i++) {
+      assertTrue(csvLinesList.contains(expectedChineseLines[i]));
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java
new file mode 100644
index 0000000..c1c687e
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text.csv;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Iterators;
+
+/**
+ * The {@code FileReaderFactory} instance that is responsible for building a
+ * {@code CSVRecordIterator}
+ */
+public class CSVFileReaderFactory implements FileReaderFactory<String> {
+  private static final Log LOG = LogFactory.getLog(CSVFileReaderFactory.class);
+  private int bufferSize;
+  private String inputFileEncoding;
+  private char openQuoteChar;
+  private char closeQuoteChar;
+  private char escapeChar;
+
+  /**
+   * Creates a new {@code CSVFileReaderFactory} instance with default
+   * configuration
+   */
+  CSVFileReaderFactory() {
+    this(CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING,
+        CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER,
+        CSVLineReader.DEFAULT_ESCAPE_CHARACTER);
+  }
+
+  /**
+   * Creates a new {@code CSVFileReaderFactory} instance with custon
+   * configuration
+   * 
+   * @param bufferSize
+   *          The size of the buffer to be used in the underlying
+   *          {@code CSVLineReader}
+   * @param inputFileEncoding
+   *          The the encoding of the input file to be read by the underlying
+   *          {@code CSVLineReader}
+   * @param openQuoteChar
+   *          The character representing the quote character to be used in the
+   *          underlying {@code CSVLineReader}
+   * @param closeQuoteChar
+   *          The character representing the quote character to be used in the
+   *          underlying {@code CSVLineReader}
+   * @param escapeChar
+   *          The character representing the escape character to be used in the
+   *          underlying {@code CSVLineReader}
+   */
+  CSVFileReaderFactory(final int bufferSize, final String inputFileEncoding, final char openQuoteChar,
+      final char closeQuoteChar, final char escapeChar) {
+    this.bufferSize = bufferSize;
+    this.inputFileEncoding = inputFileEncoding;
+    this.openQuoteChar = openQuoteChar;
+    this.closeQuoteChar = closeQuoteChar;
+    this.escapeChar = escapeChar;
+  }
+
+  @Override
+  public Iterator<String> read(FileSystem fs, Path path) {
+    FSDataInputStream is;
+    try {
+      is = fs.open(path);
+      return new CSVRecordIterator(is, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar);
+    } catch (IOException e) {
+      LOG.info("Could not read path: " + path, e);
+      return Iterators.emptyIterator();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java
new file mode 100644
index 0000000..9021f78
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text.csv;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileSourceImpl;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A {@code Source} instance that uses the {@code CSVInputFormat}, which gives
+ * each map task one single CSV record, regardless of how many lines it may
+ * span.
+ */
+public class CSVFileSource extends FileSourceImpl<String> implements ReadableSource<String> {
+
+  /**
+   * The key used in the {@code CSVInputFormat}'s {@code FormatBundle} to set
+   * the underlying {@code CSVLineReader}'s buffer size
+   */
+  public static final String CSV_BUFFER_SIZE = "csv.buffersize";
+
+  /**
+   * The key used in the {@code CSVInputFormat}'s {@code FormatBundle} to set
+   * the underlying {@code CSVLineReader}'s input file encoding
+   */
+  public static final String CSV_INPUT_FILE_ENCODING = "csv.inputfileencoding";
+
+  /**
+   * The key used in the {@code CSVInputFormat}'s {@code FormatBundle} to set
+   * the underlying {@code CSVLineReader}'s open quote character
+   */
+  public static final String CSV_OPEN_QUOTE_CHAR = "csv.openquotechar";
+
+  /**
+   * The key used in the {@code CSVInputFormat}'s {@code FormatBundle} to set
+   * the underlying {@code CSVLineReader}'s close quote character
+   */
+  public static final String CSV_CLOSE_QUOTE_CHAR = "csv.closequotechar";
+
+  /**
+   * The key used in the {@code CSVInputFormat}'s {@code FormatBundle} to set
+   * the underlying {@code CSVLineReader}'s escape character
+   */
+  public static final String CSV_ESCAPE_CHAR = "csv.escapechar";
+
+  private int bufferSize;
+  private String inputFileEncoding;
+  private char openQuoteChar;
+  private char closeQuoteChar;
+  private char escapeChar;
+
+  /**
+   * Create a new CSVFileSource instance
+   * 
+   * @param path
+   *          The {@code Path} to the input data
+   */
+  public CSVFileSource(List<Path> paths) {
+    this(paths, CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING,
+        CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER,
+        CSVLineReader.DEFAULT_ESCAPE_CHARACTER);
+  }
+
+  /**
+   * Create a new CSVFileSource instance
+   * 
+   * @param path
+   *          The {@code Path} to the input data
+   */
+  public CSVFileSource(Path path) {
+    this(path, CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING,
+        CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER,
+        CSVLineReader.DEFAULT_ESCAPE_CHARACTER);
+  }
+
+  /**
+   * Create a new CSVFileSource instance with all configurable options.
+   * 
+   * @param paths
+   *          A list of {@code Path}s to be used as input data.
+   * @param bufferSize
+   *          The size of the buffer to be used in the underlying
+   *          {@code CSVLineReader}
+   * @param inputFileEncoding
+   *          The the encoding of the input file to be read by the underlying
+   *          {@code CSVLineReader}
+   * @param openQuoteChar
+   *          The character representing the quote character to be used in the
+   *          underlying {@code CSVLineReader}
+   * @param closeQuoteChar
+   *          The character representing the quote character to be used in the
+   *          underlying {@code CSVLineReader}
+   * @param escapeChar
+   *          The character representing the escape character to be used in the
+   *          underlying {@code CSVLineReader}
+   */
+  public CSVFileSource(List<Path> paths, final int bufferSize, final String inputFileEncoding,
+      final char openQuoteChar, final char closeQuoteChar, final char escapeChar) {
+    super(paths, Writables.strings(), getCSVBundle(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar,
+        escapeChar));
+    setPrivateVariables(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar);
+  }
+
+  /**
+   * Create a new CSVFileSource instance with all configurable options.
+   * 
+   * @param path
+   *          The {@code Path} to the input data
+   * @param bufferSize
+   *          The size of the buffer to be used in the underlying
+   *          {@code CSVLineReader}
+   * @param inputFileEncoding
+   *          The the encoding of the input file to be read by the underlying
+   *          {@code CSVLineReader}
+   * @param openQuoteChar
+   *          The character representing the quote character to be used in the
+   *          underlying {@code CSVLineReader}
+   * @param closeQuoteChar
+   *          The character representing the quote character to be used in the
+   *          underlying {@code CSVLineReader}
+   * @param escapeChar
+   *          The character representing the escape character to be used in the
+   *          underlying {@code CSVLineReader}
+   */
+  public CSVFileSource(Path path, final int bufferSize, final String inputFileEncoding, final char openQuoteChar,
+      final char closeQuoteChar, final char escapeChar) {
+    super(path, Writables.strings(), getCSVBundle(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar,
+        escapeChar));
+    setPrivateVariables(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar);
+  }
+
+  @Override
+  public Iterable<String> read(Configuration conf) throws IOException {
+    return read(conf,
+        new CSVFileReaderFactory(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar));
+  }
+
+  @Override
+  public ReadableData<String> asReadable() {
+    return new CSVReadableData(paths, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar);
+  }
+
+  @Override
+  public String toString() {
+    return "CSV(" + pathsAsString() + ")";
+  }
+
+  /**
+   * Configures the job with any custom options. These will be retrieved later
+   * by {@code CSVInputFormat}
+   */
+  private static FormatBundle<CSVInputFormat> getCSVBundle(final int bufferSize, final String inputFileEncoding,
+      final char openQuoteChar, final char closeQuoteChar, final char escapeChar) {
+    FormatBundle<CSVInputFormat> bundle = FormatBundle.forInput(CSVInputFormat.class);
+    bundle.set(RuntimeParameters.DISABLE_COMBINE_FILE, "true");
+    bundle.set(CSV_BUFFER_SIZE, String.valueOf(bufferSize));
+    bundle.set(CSV_INPUT_FILE_ENCODING, String.valueOf(inputFileEncoding));
+    bundle.set(CSV_OPEN_QUOTE_CHAR, String.valueOf(openQuoteChar));
+    bundle.set(CSV_CLOSE_QUOTE_CHAR, String.valueOf(closeQuoteChar));
+    bundle.set(CSV_ESCAPE_CHAR, String.valueOf(escapeChar));
+    return bundle;
+  }
+
+  private void setPrivateVariables(final int bufferSize, final String inputFileEncoding, final char openQuoteChar,
+      final char closeQuoteChar, final char escapeChar) {
+    if (isSameCharacter(openQuoteChar, escapeChar)) {
+      throw new IllegalArgumentException("The open quote (" + openQuoteChar + ") and escape (" + escapeChar
+          + ") characters must be different!");
+    }
+    if (isSameCharacter(closeQuoteChar, escapeChar)) {
+      throw new IllegalArgumentException("The close quote (" + closeQuoteChar + ") and escape (" + escapeChar
+          + ") characters must be different!");
+    }
+    this.bufferSize = bufferSize;
+    this.inputFileEncoding = inputFileEncoding;
+    this.openQuoteChar = openQuoteChar;
+    this.closeQuoteChar = closeQuoteChar;
+    this.escapeChar = escapeChar;
+  }
+  
+  private boolean isSameCharacter(final char c1, final char c2) {
+    return c2 == c1;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
new file mode 100644
index 0000000..5d5abe3
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text.csv;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A {@link FileInputFormat} for use specifically with CSV files. This input
+ * format deals with the fact that CSV files can potentially have multiple lines
+ * within fields which should all be treated as one record.
+ */
+public class CSVInputFormat extends FileInputFormat<LongWritable, Text> {
+  private int bufferSize;
+  private String inputFileEncoding;
+  private char openQuoteChar;
+  private char closeQuoteChar;
+  private char escapeChar;
+
+  /**
+   * This method is used by crunch to get an instance of {@link CSVRecordReader}
+   * 
+   * @param split
+   *          the {@link InputSplit} that will be assigned to the record reader
+   * @param context
+   *          the {@TaskAttemptContext} for the job
+   * @return an instance of {@link CSVRecordReader} created using
+   *         {@link CSVInputFormat#getSeparatorChar()},
+   *         {@link CSVInputFormat#getQuoteChar()}, and
+   *         {@link CSVInputFormat#getEscapeChar()}.
+   */
+  @Override
+  public RecordReader<LongWritable, Text> createRecordReader(final InputSplit split, final TaskAttemptContext context) {
+    return new CSVRecordReader(this.bufferSize, this.inputFileEncoding, this.openQuoteChar, this.closeQuoteChar,
+        this.escapeChar);
+  }
+
+  /**
+   * A method used by crunch to calculate the splits for each file. This will
+   * split each CSV file at the end of a valid CSV record.
+   * 
+   * @param job
+   *          the {@link JobContext} for the current job.
+   * @return a List containing all of the calculated splits for a single file.
+   * @throws IOException
+   *           if an error occurs while accessing HDFS
+   */
+  @Override
+  public List<InputSplit> getSplits(final JobContext job) throws IOException {
+    final long splitSize = job.getConfiguration().getLong("csv.input.split.size", 67108864);
+    final List<InputSplit> splits = new ArrayList<InputSplit>();
+    final Path[] paths = FileUtil.stat2Paths(listStatus(job).toArray(new FileStatus[0]));
+    FileSystem fileSystem = null;
+    FSDataInputStream inputStream = null;
+    try {
+      for (final Path path : paths) {
+        fileSystem = path.getFileSystem(job.getConfiguration());
+        inputStream = fileSystem.open(path);
+        splits.addAll(getSplitsForFile(splitSize, fileSystem.getFileStatus(path).getLen(), path, inputStream));
+      }
+      return splits;
+    } finally {
+      inputStream.close();
+    }
+  }
+
+  /**
+   * In summary, this method will start at the beginning of the file, seek to
+   * the position corresponding to the desired split size, seek to the end of
+   * the line that contains that position, then attempt to seek until the
+   * CSVLineReader indicates that the current position is no longer within a CSV
+   * record. Then, it will mark that position for a split and a repeat its
+   * logic.
+   */
+  @VisibleForTesting
+  protected List<FileSplit> getSplitsForFile(final long splitSize, final long fileSize, final Path fileName,
+      final FSDataInputStream inputStream) throws IOException {
+    final List<FileSplit> splitsList = new ArrayList<FileSplit>();
+
+    long splitStart;
+    long currentPosition = 0;
+
+    boolean endOfFile = false;
+    while (!endOfFile) {
+      // Set the start of this split to the furthest read point in the file
+      splitStart = currentPosition;
+
+      // Skip a number of bytes equal to the desired split size to avoid parsing
+      // every csv line, which greatly increases the run time
+      currentPosition = splitStart + splitSize;
+
+      // The input stream will freak out if we try to seek past the EOF
+      if (currentPosition >= fileSize) {
+        currentPosition = fileSize;
+        endOfFile = true;
+        final FileSplit fileSplit = new FileSplit(fileName, splitStart, currentPosition - splitStart, new String[] {});
+        splitsList.add(fileSplit);
+        break;
+      }
+
+      // Every time we seek to the new approximate split point,
+      // we need to create a new CSVLineReader around the stream.
+      inputStream.seek(currentPosition);
+      final CSVLineReader csvLineReader = new CSVLineReader(inputStream, this.bufferSize, this.inputFileEncoding,
+          this.openQuoteChar, this.closeQuoteChar, this.escapeChar);
+
+      // This line is potentially garbage because we most likely just sought to
+      // the middle of a line. Read the rest of the line and leave it for the
+      // previous split. Then reset the multi-line CSV record boolean, because
+      // the partial line will have a very high chance of falsely triggering the
+      // class wide multi-line logic.
+      currentPosition += csvLineReader.readFileLine(new Text());
+      csvLineReader.resetMultiLine();
+
+      // Now, we may still be in the middle of a multi-line CSV record.
+      currentPosition += csvLineReader.readFileLine(new Text());
+
+      // If we are, read until we are not.
+      while (csvLineReader.isInMultiLine()) {
+        final int bytesRead = csvLineReader.readFileLine(new Text());
+        // End of file
+        if (bytesRead <= 0) {
+          break;
+        }
+        currentPosition += bytesRead;
+      }
+
+      // We're out of the multi-line CSV record, so it's safe to end the
+      // previous split.
+      splitsList.add(new FileSplit(fileName, splitStart, currentPosition - splitStart, new String[] {}));
+    }
+
+    return splitsList;
+  }
+
+  /**
+   * This method will read the configuration that is set in
+   * {@link CSVFileSource}'s private getBundle() method
+   * 
+   * @param jobConf
+   *          The {@code JobConf} instance from which the CSV configuration
+   *          parameters will be read, if necessary.
+   */
+  public void configure(JobConf jobConf) {
+    String bufferValue = jobConf.get(CSVFileSource.CSV_BUFFER_SIZE);
+    if ("".equals(bufferValue)) {
+      bufferSize = CSVLineReader.DEFAULT_BUFFER_SIZE;
+    } else {
+      bufferSize = Integer.parseInt(bufferValue);
+    }
+
+    String inputFileEncodingValue = jobConf.get(CSVFileSource.CSV_INPUT_FILE_ENCODING);
+    if ("".equals(inputFileEncodingValue)) {
+      inputFileEncoding = CSVLineReader.DEFAULT_INPUT_FILE_ENCODING;
+    } else {
+      inputFileEncoding = inputFileEncodingValue;
+    }
+
+    String openQuoteCharValue = jobConf.get(CSVFileSource.CSV_OPEN_QUOTE_CHAR);
+    if ("".equals(openQuoteCharValue)) {
+      openQuoteChar = CSVLineReader.DEFAULT_QUOTE_CHARACTER;
+    } else {
+      openQuoteChar = openQuoteCharValue.charAt(0);
+    }
+
+    String closeQuoteCharValue = jobConf.get(CSVFileSource.CSV_CLOSE_QUOTE_CHAR);
+    if ("".equals(closeQuoteCharValue)) {
+      closeQuoteChar = CSVLineReader.DEFAULT_QUOTE_CHARACTER;
+    } else {
+      closeQuoteChar = closeQuoteCharValue.charAt(0);
+    }
+
+    String escapeCharValue = jobConf.get(CSVFileSource.CSV_ESCAPE_CHAR);
+    if ("".equals(escapeCharValue)) {
+      escapeChar = CSVLineReader.DEFAULT_ESCAPE_CHARACTER;
+    } else {
+      escapeChar = escapeCharValue.charAt(0);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java
new file mode 100644
index 0000000..83e2abe
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text.csv;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.hadoop.io.Text;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A record reader written specifically to read individual lines from CSV files.
+ * Most notably, it can read CSV records which span multiple lines.
+ */
+@ParametersAreNonnullByDefault
+public class CSVLineReader {
+
+  // InputStream related variables
+  /**
+   * The default buffer size (64k) to be used when reading from the InputStream
+   */
+  public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+  private final InputStreamReader inputStreamReader;
+  private final String inputFileEncoding;
+  private final CharsetEncoder charsetEncoder;
+  private char[] buffer;
+  private final int bufferSize;
+  private int bufferLength = 0;
+  private int bufferPosition = 0;
+  private boolean bufferIsPadded = false;
+  private static char CR = '\r';
+  private static char LF = '\n';
+  private boolean endOfFile = false;
+
+  // CSV parsing related variables
+  /**
+   * The default character to represent quotation marks, '"'
+   */
+  public static final char DEFAULT_QUOTE_CHARACTER = '"';
+  /**
+   * The default character to represent an escape used before a control
+   * character that should be displayed, '\'
+   */
+  public static final char DEFAULT_ESCAPE_CHARACTER = '\\';
+  /**
+   * The default character to represent a null character, '\0'
+   */
+  public static final char NULL_CHARACTER = '\0';
+  /**
+   * The default input file encoding to read with, UTF-8
+   */
+  public static final String DEFAULT_INPUT_FILE_ENCODING = "UTF-8";
+  private final char openQuoteChar;
+  private final char closeQuoteChar;
+  private final char escape;
+  private boolean inMultiLine = false;
+  private boolean currentlyInQuotes = false;
+  private boolean endOfLineReached = false;
+  private Text inputText = new Text();
+
+  /**
+   * This constructor will use default values for buffer size and control
+   * characters.
+   * 
+   * @param inputStream
+   *          The @{link InputStream} to read from. Note that this input stream
+   *          should start at the very beginning of the CSV file to be read OR
+   *          at the very beginning of a CSV entry. If the input stream starts
+   *          at any other position (such as in the middle of a line) this
+   *          reader will not work properly.
+   * @throws UnsupportedEncodingException
+   */
+  public CSVLineReader(final InputStream inputStream) throws UnsupportedEncodingException {
+    this(inputStream, DEFAULT_BUFFER_SIZE, DEFAULT_INPUT_FILE_ENCODING, DEFAULT_QUOTE_CHARACTER,
+        DEFAULT_QUOTE_CHARACTER, DEFAULT_ESCAPE_CHARACTER);
+  }
+
+  /**
+   * The fully customizable constructor for CSVLineReader
+   * 
+   * @param inputStream
+   *          The @{link InputStream} to read from. Note that this input stream
+   *          should start at the very beginning of the CSV file to be read OR
+   *          at the very beginning of a CSV entry. If the input stream starts
+   *          at any other position (such as in the middle of a line) this
+   *          reader will not work properly.
+   * @param bufferSize
+   *          The size of the buffer used when reading the input stream
+   * @param inputFileEncoding
+   *          The encoding of the file to read from.
+   * @param openQuoteChar
+   *          Used to specify a custom open quote character
+   * @param closeQuoteChar
+   *          Used to specify a custom close quote character
+   * @param escape
+   *          Used to specify a custom escape character
+   * @throws UnsupportedEncodingException
+   */
+  public CSVLineReader(final InputStream inputStream, final int bufferSize, final String inputFileEncoding,
+      final char openQuoteChar, final char closeQuoteChar, final char escapeChar) {
+    Preconditions.checkNotNull(inputStream, "inputStream may not be null");
+    Preconditions.checkNotNull(inputFileEncoding, "inputFileEncoding may not be null");
+    if (bufferSize <= 0) {
+      throw new IllegalArgumentException("The buffer (" + bufferSize + ")cannot be <= 0");
+    }
+
+    // Input Stream related variables
+    try {
+      this.inputStreamReader = new InputStreamReader(inputStream, inputFileEncoding);
+    } catch (final UnsupportedEncodingException uee) {
+      throw new RuntimeException(inputFileEncoding + " is not a supported encoding.", uee);
+    }
+    this.bufferSize = bufferSize;
+    this.buffer = new char[this.bufferSize];
+
+    // CSV parsing related variables
+    if (isSameCharacter(openQuoteChar, escapeChar)) {
+      throw new IllegalArgumentException("The open quote (" + openQuoteChar + ") and escape (" + escapeChar
+          + ") characters must be different!");
+    }
+    if (isSameCharacter(closeQuoteChar, escapeChar)) {
+      throw new IllegalArgumentException("The close quote (" + closeQuoteChar + ") and escape (" + escapeChar
+          + ") characters must be different!");
+    }
+    this.openQuoteChar = openQuoteChar;
+    this.closeQuoteChar = closeQuoteChar;
+    this.escape = escapeChar;
+    this.inputFileEncoding = inputFileEncoding;
+    this.charsetEncoder = Charset.forName(inputFileEncoding).newEncoder();
+  }
+
+  /**
+   * This method will read through one full CSV record, place its content into
+   * the input Text and return the number of bytes (including newline
+   * characters) that were consumed.
+   * 
+   * @param input
+   *          a mutable @{link Text} object into which the text of the CSV
+   *          record will be stored, without any line feeds or carriage returns
+   * @return the number of byes that were read, including any control
+   *         characters, line feeds, or carriage returns.
+   * @throws IOException
+   *           if an IOException occurs while handling the file to be read
+   */
+  public int readCSVLine(final Text input) throws IOException {
+    Preconditions.checkNotNull(input, "inputText may not be null");
+    inputText = new Text(input);
+    long totalBytesConsumed = 0;
+    if (endOfFile) {
+      return 0;
+    }
+    if (inMultiLine) {
+      throw new RuntimeException("Cannot begin reading a CSV record while inside of a multi-line CSV record.");
+    }
+
+    inputText.clear();
+    do {
+      totalBytesConsumed += readFileLine(inputText);
+      // a line has been read. We need to see if we're still in quotes and tack
+      // on a newline if so
+      if (currentlyInQuotes && !endOfFile) {
+        // Add one LF to mark the line return, otherwise any multi-line CSV
+        // record will all be on one line.
+        inputText.set(new StringBuilder().append(inputText.toString()).append('\n').toString());
+      }
+    } while (currentlyInQuotes);
+
+    if (totalBytesConsumed > Integer.MAX_VALUE) {
+      throw new IOException("Too many bytes consumed before newline: " + Integer.MAX_VALUE);
+    }
+
+    input.set(inputText);
+    return (int) totalBytesConsumed;
+  }
+
+  /**
+   * A method for reading through one single line in the CSV file, that is, it
+   * will read until the first line feed, carriage return, or set of both is
+   * found. The CSV parsing logic markers are maintained outside of this method
+   * to enable the manipulation that logic in order to find the beginning of a
+   * CSV record. Use {@link CSVLineReader#isInMultiLine()} and
+   * {@link CSVLineReader#resetMultiLine()} to do so. See
+   * {@link CSVInputFormat#getSplitsForFile(long, long, org.apache.hadoop.fs.Path, org.apache.hadoop.fs.FSDataInputStream)}
+   * for an example.
+   * 
+   * @param input
+   *          a mutable @{link Text} object into which the text of the line will
+   *          be stored, without any line feeds or carriage returns
+   * @return the number of byes that were read, including any control
+   *         characters, line feeds, or carriage returns.
+   * @throws IOException
+   *           if an IOException occurs while handling the file to be read
+   */
+  public int readFileLine(final Text input) throws IOException {
+    Preconditions.checkNotNull(input, "inputText may not be null");
+    if (endOfFile) {
+      return 0;
+    }
+
+    // This integer keeps track of the number of newline characters used to
+    // terminate the line being read. This
+    // could be 1, in the case of LF or CR, or 2, in the case of CRLF.
+    int newlineLength = 0;
+    int inputTextLength = 0;
+    long bytesConsumed = 0;
+    int readTextLength = 0;
+    int startPosition = bufferPosition;
+    endOfLineReached = false;
+    inputText = new Text(input);
+
+    do {
+      boolean checkForLF = false;
+      // Figure out where we are in the buffer and fill it if necessary.
+      if (bufferPosition >= bufferLength) {
+        refillBuffer();
+        startPosition = bufferPosition;
+        if (endOfFile) {
+          break;
+        }
+      }
+
+      newlineLength = 0;
+      // Iterate through the buffer looking for newline characters while keeping
+      // track of if we're in a field
+      // and/or in quotes.
+      for (; bufferPosition < bufferLength; ++bufferPosition) {
+        bytesConsumed += calculateCharacterByteLength(buffer[bufferPosition]);
+        if (buffer[bufferPosition] == this.escape) {
+          if (isNextCharacterEscapable(currentlyInQuotes, bufferPosition)) {
+            // checks to see if we are in quotes and if the next character is a
+            // quote or an escape
+            // character. If so, that's fine. Record the next character's size
+            // and skip it.
+            ++bufferPosition;
+            bytesConsumed += calculateCharacterByteLength(buffer[bufferPosition]);
+          }
+        } else if (buffer[bufferPosition] == openQuoteChar || buffer[bufferPosition] == closeQuoteChar) {
+          // toggle currentlyInQuotes if we've hit a non-escaped quote character
+          currentlyInQuotes = !currentlyInQuotes;
+        } else if (buffer[bufferPosition] == LF || buffer[bufferPosition] == CR) {
+          boolean lastCharWasCR = buffer[bufferPosition] == CR;
+          // Line is over, make note and increment the size of the newlinelength
+          // counter.
+          endOfLineReached = true;
+          ++newlineLength;
+          ++bufferPosition;
+          if (lastCharWasCR && buffer[bufferPosition] == LF) {
+            lastCharWasCR = false;
+            // Check for LF (in case of CRLF line endings) and increment the
+            // counter, skip it by moving the
+            // buffer position, then record the length of the LF.
+            ++newlineLength;
+            ++bufferPosition;
+            bytesConsumed += calculateCharacterByteLength(buffer[bufferPosition]);
+          } else if (lastCharWasCR && bufferPosition >= bufferLength) {
+            // We just read a CR at the very end of the buffer. If this is a
+            // file with CRLF line endings, there will be a LF next that we need
+            // to check for and account for in bytesRead before we count this
+            // line as "read".
+            checkForLF = true;
+          }
+          break;
+        }
+      }
+      // This is the length of the actual text and important stuff in the line.
+      readTextLength = bufferPosition - startPosition - newlineLength;
+
+      // Append the results.
+      if (readTextLength > Integer.MAX_VALUE - inputTextLength) {
+        readTextLength = Integer.MAX_VALUE - inputTextLength;
+      }
+      if (readTextLength > 0) {
+        // This will append the portion of the buffer containing only the
+        // important text, omitting any newline characters
+        inputText.set(new StringBuilder().append(inputText.toString())
+            .append(new String(buffer, startPosition, readTextLength)).toString());
+        inputTextLength += readTextLength;
+      }
+
+      // If the last character we read was a CR at the end of the buffer, we
+      // need to check for an LF after a buffer refill.
+      if (checkForLF) {
+        refillBuffer();
+        if (endOfFile) {
+          break;
+        }
+        if (buffer[bufferPosition] == LF) {
+          bytesConsumed += calculateCharacterByteLength(buffer[bufferPosition]);
+          ++bufferPosition;
+          ++newlineLength;
+        }
+      }
+
+    } while (newlineLength == 0 && bytesConsumed < Integer.MAX_VALUE);
+
+    if (endOfLineReached) {
+      if (currentlyInQuotes) {
+        inMultiLine = true;
+      } else {
+        inMultiLine = false;
+      }
+    }
+
+    if (bytesConsumed > Integer.MAX_VALUE) {
+      throw new IOException("Too many bytes consumed before newline: " + Integer.MAX_VALUE);
+    }
+
+    input.set(inputText);
+    return (int) bytesConsumed;
+  }
+
+  /**
+   * For use with {@link CSVLineReader#readFileLine(Text)}. Returns current
+   * multi-line CSV status.
+   * 
+   * @return a boolean signifying if the last
+   *         {@link CSVLineReader#readFileLine(Text)} call ended in the middle
+   *         of a multi-line CSV record
+   */
+  public boolean isInMultiLine() {
+    return inMultiLine;
+  }
+
+  /**
+   * For use with {@link CSVLineReader#readFileLine(Text)}. Resets current
+   * multi-line CSV status.
+   */
+  public void resetMultiLine() {
+    inMultiLine = false;
+    currentlyInQuotes = false;
+  }
+
+  private boolean isSameCharacter(final char c1, final char c2) {
+    return c1 != NULL_CHARACTER && c1 == c2;
+  }
+
+  private boolean isNextCharacterEscapable(final boolean inQuotes, final int i) {
+    return inQuotes // we are in quotes, therefore there can be escaped quotes
+                    // in here.
+        && buffer.length > (i + 1) // there is indeed another character to
+                                   // check.
+        && (buffer[i + 1] == closeQuoteChar || buffer[i + 1] == openQuoteChar || buffer[i + 1] == this.escape);
+  }
+
+  private void refillBuffer() throws IOException {
+    bufferPosition = 0;
+
+    // Undo the buffer padding
+    if (bufferIsPadded) {
+      buffer = new char[bufferLength];
+      bufferIsPadded = false;
+    }
+
+    bufferLength = inputStreamReader.read(buffer, 0, buffer.length);
+    // if bufferLength < bufferSize, this buffer will contain the end of the
+    // file. However, our line logic needs to be able to see what's a few spots
+    // past the current position. This will cause an index out of bounds
+    // exception if the buffer is full. So, my solution is to add a few extra
+    // spaces to the buffer so that the logic can still read ahead.
+    if (buffer.length == bufferLength) {
+      final char[] biggerBuffer = new char[bufferLength + 3];
+      for (int i = 0; i < bufferLength; i++) {
+        biggerBuffer[i] = buffer[i];
+      }
+      buffer = biggerBuffer;
+      bufferIsPadded = true;
+    }
+
+    if (bufferLength <= 0) {
+      endOfFile = true;
+    }
+  }
+
+  private int calculateCharacterByteLength(final char character) {
+    try {
+      return charsetEncoder.encode(CharBuffer.wrap(new char[] { character })).limit();
+    } catch (final CharacterCodingException e) {
+      throw new RuntimeException("The character attempting to be read (" + character + ") could not be encoded with "
+          + inputFileEncoding);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java
new file mode 100644
index 0000000..63e74d9
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text.csv;
+
+import java.util.List;
+
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.ReadableDataImpl;
+import org.apache.hadoop.fs.Path;
+
+public class CSVReadableData extends ReadableDataImpl<String> {
+
+  private int bufferSize;
+  private String inputFileEncoding;
+  private char openQuoteChar;
+  private char closeQuoteChar;
+  private char escapeChar;
+
+  /**
+   * Creates an instance of {@code CSVReadableData} with default configuration
+   * 
+   * @param paths
+   *          The paths of the files to be read
+   */
+  protected CSVReadableData(List<Path> paths) {
+    this(paths, CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING,
+        CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER,
+        CSVLineReader.DEFAULT_ESCAPE_CHARACTER);
+  }
+
+  /**
+   * Creates an instance of {@code CSVReadableData} with specified configuration
+   * @param paths
+   *          a list of input file paths
+   * @param bufferSize
+   *          the size of the buffer to use while parsing through the input file
+   * @param inputFileEncoding
+   *          the encoding for the input file
+   * @param openQuote
+   *          the character to use to open quote blocks
+   * @param closeQuote
+   *          the character to use to close quote blocks
+   * @param escape
+   *          the character to use for escaping control characters and quotes
+   */
+  protected CSVReadableData(List<Path> paths, final int bufferSize, final String inputFileEncoding,
+      final char openQuoteChar, final char closeQuoteChar, final char escapeChar) {
+    super(paths);
+    this.bufferSize = bufferSize;
+    this.inputFileEncoding = inputFileEncoding;
+    this.openQuoteChar = openQuoteChar;
+    this.closeQuoteChar = closeQuoteChar;
+    this.escapeChar = escapeChar;
+  }
+
+  @Override
+  protected FileReaderFactory<String> getFileReaderFactory() {
+    return new CSVFileReaderFactory(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java
new file mode 100644
index 0000000..df5e39c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text.csv;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Text;
+
+import com.google.common.io.Closeables;
+
+/**
+ * An {@code Iterator} for an internally created {@code CSVLineReader}
+ */
+public class CSVRecordIterator implements Iterator<String>, Closeable {
+  private CSVLineReader csvLineReader;
+  private InputStream inputStream;
+  private String currentLine;
+
+  /**
+   * Creates an instance of {@code CSVRecordIterator} with default configuration
+   * 
+   * @param inputStream
+   *          The {@code InputStream} for the CSV file to iterate over
+   * @throws UnsupportedEncodingException
+   */
+  public CSVRecordIterator(final InputStream inputStream) throws UnsupportedEncodingException {
+    this(inputStream, CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING,
+        CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER,
+        CSVLineReader.DEFAULT_ESCAPE_CHARACTER);
+  }
+
+  /**
+   * Creates an instance of {@code CSVRecordIterator} with custom configuration
+   * 
+   * @param inputStream
+   *          The {@code InputStream} for the CSV file to iterate over
+   * @param bufferSize
+   *          The size of the buffer used when reading the input stream
+   * @param inputFileEncoding
+   *          the encoding for the input file
+   * @param openQuote
+   *          the character to use to open quote blocks
+   * @param closeQuote
+   *          the character to use to close quote blocks
+   * @param escape
+   *          the character to use for escaping control characters and quotes
+   * @throws UnsupportedEncodingException
+   */
+  public CSVRecordIterator(final InputStream inputStream, final int bufferSize, final String inputFileEncoding,
+      final char openQuoteChar, final char closeQuoteChar, final char escapeChar) throws UnsupportedEncodingException {
+    csvLineReader = new CSVLineReader(inputStream, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar,
+        escapeChar);
+    this.inputStream = inputStream;
+    incrementValue();
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!(currentLine == null)) {
+      return true;
+    }
+    Closeables.closeQuietly(this);
+    return false;
+  }
+
+  @Override
+  public String next() {
+    String result = currentLine;
+    incrementValue();
+    return result;
+  }
+
+  @Override
+  public void remove() {
+    incrementValue();
+  }
+
+  private void incrementValue() {
+    Text tempText = new Text();
+    try {
+      csvLineReader.readCSVLine(tempText);
+    } catch (IOException e) {
+      throw new RuntimeException("A problem occurred accessing the underlying CSV file stream.", e);
+    }
+    String tempTextAsString = tempText.toString();
+    if ("".equals(tempTextAsString)) {
+      currentLine = null;
+    } else {
+      currentLine = tempTextAsString;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (inputStream != null) {
+      inputStream.close();
+      inputStream = null;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java
new file mode 100644
index 0000000..d04da98
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text.csv;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An extension of {@link RecordReader} used to intelligently read CSV files
+ */
+public class CSVRecordReader extends RecordReader<LongWritable, Text> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(CSVRecordReader.class);
+  private long start;
+  private long pos;
+  private long end;
+  private LongWritable key = null;
+  private Text value = null;
+
+  private FSDataInputStream fileIn;
+  private CSVLineReader csvLineReader;
+  private final char openQuote;
+  private final char closeQuote;
+  private final char escape;
+  private final String inputFileEncoding;
+  private final int fileStreamBufferSize;
+
+  private int totalRecordsRead = 0;
+
+  /**
+   * Default constructor, specifies default values for the {@link CSVLineReader}
+   */
+  public CSVRecordReader() {
+    this(CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING,
+        CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER,
+        CSVLineReader.DEFAULT_ESCAPE_CHARACTER);
+  }
+
+  /**
+   * Customizable constructor used to specify all input parameters for the
+   * {@link CSVLineReader}
+   * 
+   * @param bufferSize
+   *          the size of the buffer to use while parsing through the input file
+   * @param inputFileEncoding
+   *          the encoding for the input file
+   * @param openQuote
+   *          the character to use to open quote blocks
+   * @param closeQuote
+   *          the character to use to close quote blocks
+   * @param escape
+   *          the character to use for escaping control characters and quotes
+   */
+  public CSVRecordReader(final int bufferSize, final String inputFileEncoding, final char openQuote,
+      final char closeQuote, final char escape) {
+    Preconditions.checkNotNull(openQuote, "quote cannot be null.");
+    Preconditions.checkNotNull(closeQuote, "quote cannot be null.");
+    Preconditions.checkNotNull(escape, "escape cannot be null.");
+
+    this.fileStreamBufferSize = bufferSize;
+    this.inputFileEncoding = inputFileEncoding;
+    this.openQuote = openQuote;
+    this.closeQuote = closeQuote;
+    this.escape = escape;
+  }
+
+  /**
+   * Initializes the record reader
+   * 
+   * @param genericSplit
+   *          the split assigned to this record reader
+   * @param context
+   *          the job context for this record reader
+   * @throws IOException
+   *           if an IOException occurs while handling the file to be read
+   */
+  @Override
+  public void initialize(final InputSplit genericSplit, final TaskAttemptContext context) throws IOException {
+    final FileSplit split = (FileSplit) genericSplit;
+    final Configuration job = context.getConfiguration();
+
+    start = split.getStart();
+    end = start + split.getLength();
+    this.pos = start;
+
+    final Path file = split.getPath();
+    LOGGER.info("Initializing processing of split for file: " + file);
+    LOGGER.info("File size is: " + file.getFileSystem(job).getFileStatus(file).getLen());
+    LOGGER.info("Split starts at: " + start);
+    LOGGER.info("Split will end at: " + end);
+
+    // Open the file, seek to the start of the split, then wrap it in a
+    // CSVLineReader
+    fileIn = file.getFileSystem(job).open(file);
+    fileIn.seek(start);
+    csvLineReader = new CSVLineReader(fileIn, this.fileStreamBufferSize, inputFileEncoding, this.openQuote,
+        this.closeQuote, this.escape);
+  }
+
+  /**
+   * Increments the key and value pair for this reader
+   * 
+   * @return true if there is another key/value to be read, false if not.
+   * @throws IOException
+   *           if an IOException occurs while handling the file to be read
+   */
+  @Override
+  public boolean nextKeyValue() throws IOException {
+    if (key == null) {
+      key = new LongWritable();
+    }
+    key.set(pos);
+    if (value == null) {
+      value = new Text();
+    }
+
+    if (pos >= end) {
+      key = null;
+      value = null;
+      LOGGER.info("End of split reached, ending processing. Total records read for this split: " + totalRecordsRead);
+      close();
+      return false;
+    }
+
+    final int newSize = csvLineReader.readCSVLine(value);
+
+    if (newSize == 0) {
+      LOGGER.info("End of file reached. Ending processing. Total records read for this split: " + totalRecordsRead);
+      return false;
+    }
+
+    pos += newSize;
+    totalRecordsRead++;
+    return true;
+  }
+
+  /**
+   * Returns the current key
+   * 
+   * @return the key corresponding to the current value
+   */
+  @Override
+  public LongWritable getCurrentKey() {
+    return key;
+  }
+
+  /**
+   * Returns the current value
+   * 
+   * @return the value corresponding to the current key
+   */
+  @Override
+  public Text getCurrentValue() {
+    return value;
+  }
+
+  /**
+   * Get the progress within the split
+   */
+  @Override
+  public float getProgress() {
+    if (start == end) {
+      return 0.0f;
+    }
+    return Math.min(1.0f, (pos - start) / (float) (end - start));
+  }
+
+  /**
+   * Closes the file input stream for this record reader
+   */
+  @Override
+  public synchronized void close() throws IOException {
+    fileIn.close();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java b/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java
new file mode 100644
index 0000000..55f2f1f
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text.csv;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.hadoop.io.Text;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class CSVLineReaderTest {
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testVariousUTF8Characters() throws IOException {
+    final String variousCharacters = "€Abبώиב¥£€¢₡₢₣₤₥₦§₧₨₩₪₫₭₮漢Ä©óíßä";
+    String utf8Junk = tmpDir.copyResourceFileName("UTF8.csv");
+    FileInputStream fileInputStream = null;
+    try {
+
+      fileInputStream = new FileInputStream(new File(utf8Junk));
+      final CSVLineReader csvLineReader = new CSVLineReader(fileInputStream);
+      final Text readText = new Text();
+      csvLineReader.readCSVLine(readText);
+      assertEquals(variousCharacters, readText.toString());
+    } finally {
+      fileInputStream.close();
+    }
+  }
+
+  /**
+   * This is effectively a mirror the address tests, but using Chinese
+   * characters, even for the quotation marks and escape characters.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testBrokenLineParsingInChinese() throws IOException {
+    final String[] expectedChineseLines = { "您好我叫马克,我从亚拉巴马州来,我是软件工程师,我二十八岁", "我有一个宠物,它是一个小猫,它六岁,它很漂亮",
+        "我喜欢吃饭,“我觉得这个饭最好\n*蛋糕\n*包子\n*冰淇淋\n*啤酒“,他们都很好,我也很喜欢奶酪但它是不健康的", "我是男的,我的头发很短,我穿蓝色的裤子,“我穿黑色的、“衣服”" };
+    String chineseLines = tmpDir.copyResourceFileName("brokenChineseLines.csv");
+    FileInputStream fileInputStream = null;
+    try {
+      fileInputStream = new FileInputStream(new File(chineseLines));
+      final CSVLineReader csvLineReader = new CSVLineReader(fileInputStream, CSVLineReader.DEFAULT_BUFFER_SIZE,
+          CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '“', '”', '、');
+      for (int i = 0; i < expectedChineseLines.length; i++) {
+        final Text readText = new Text();
+        csvLineReader.readCSVLine(readText);
+        assertEquals(expectedChineseLines[i], readText.toString());
+      }
+    } finally {
+      fileInputStream.close();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVRecordIteratorTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVRecordIteratorTest.java b/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVRecordIteratorTest.java
new file mode 100644
index 0000000..306a617
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVRecordIteratorTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text.csv;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class CSVRecordIteratorTest {
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testVanillaCSV() throws IOException {
+    String[] expectedFileContents = { "1,2,3,4", "5,6,7,8", "9,10,11", "12,13,14" };
+
+    String vanillaCSVFile = tmpDir.copyResourceFileName("vanilla.csv");
+    File vanillaFile = new File(vanillaCSVFile);
+    InputStream inputStream = new FileInputStream(vanillaFile);
+    CSVRecordIterator csvRecordIterator = new CSVRecordIterator(inputStream);
+
+    ArrayList<String> fileContents = new ArrayList<String>(5);
+    while (csvRecordIterator.hasNext()) {
+      fileContents.add(csvRecordIterator.next());
+    }
+
+    for (int i = 0; i < expectedFileContents.length; i++) {
+      assertEquals(expectedFileContents[i], fileContents.get(i));
+    }
+  }
+
+  @Test
+  public void testCSVWithNewlines() throws IOException {
+    String[] expectedFileContents = {
+        "\"Champion, Mac\",\"1234 Hoth St.\n\tApartment 101\n\tAtlanta, GA\n\t64086\",\"30\",\"M\",\"5/28/2010 12:00:00 AM\",\"Just some guy\"",
+        "\"Champion, Mac\",\"5678 Tatooine Rd. Apt 5, Mobile, AL 36608\",\"30\",\"M\",\"Some other date\",\"short description\"" };
+    String csvWithNewlines = tmpDir.copyResourceFileName("withNewlines.csv");
+    File fileWithNewlines = new File(csvWithNewlines);
+    InputStream inputStream = new FileInputStream(fileWithNewlines);
+    CSVRecordIterator csvRecordIterator = new CSVRecordIterator(inputStream);
+
+    ArrayList<String> fileContents = new ArrayList<String>(2);
+    while (csvRecordIterator.hasNext()) {
+      fileContents.add(csvRecordIterator.next());
+    }
+
+    for (int i = 0; i < expectedFileContents.length; i++) {
+      assertEquals(expectedFileContents[i], fileContents.get(i));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-test/src/main/resources/UTF8.csv
----------------------------------------------------------------------
diff --git a/crunch-test/src/main/resources/UTF8.csv b/crunch-test/src/main/resources/UTF8.csv
new file mode 100644
index 0000000..6f0b647
--- /dev/null
+++ b/crunch-test/src/main/resources/UTF8.csv
@@ -0,0 +1 @@
+€Abبώиב¥£€¢₡₢₣₤₥₦§₧₨₩₪₫₭₮漢Ä©óíßä
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-test/src/main/resources/brokenChineseLines.csv
----------------------------------------------------------------------
diff --git a/crunch-test/src/main/resources/brokenChineseLines.csv b/crunch-test/src/main/resources/brokenChineseLines.csv
new file mode 100644
index 0000000..a082afc
--- /dev/null
+++ b/crunch-test/src/main/resources/brokenChineseLines.csv
@@ -0,0 +1,8 @@
+您好我叫马克,我从亚拉巴马州来,我是软件工程师,我二十八岁
+我有一个宠物,它是一个小猫,它六岁,它很漂亮
+我喜欢吃饭,“我觉得这个饭最好
+*蛋糕
+*包子
+*冰淇淋
+*啤酒“,他们都很好,我也很喜欢奶酪但它是不健康的
+我是男的,我的头发很短,我穿蓝色的裤子,“我穿黑色的、“衣服”
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-test/src/main/resources/customQuoteCharWithNewlines.csv
----------------------------------------------------------------------
diff --git a/crunch-test/src/main/resources/customQuoteCharWithNewlines.csv b/crunch-test/src/main/resources/customQuoteCharWithNewlines.csv
new file mode 100644
index 0000000..b3644c4
--- /dev/null
+++ b/crunch-test/src/main/resources/customQuoteCharWithNewlines.csv
@@ -0,0 +1,5 @@
+*Champion, Mac*,*1234 Hoth St.
+	Apartment 101
+	Atlanta, GA
+	64086*,*30*,*M*,*5/28/2010 12:00:00 AM*,*Just some guy*
+*Mac, Champion*,*5678 Tatooine Rd. Apt 5, Mobile, AL 36608*,*30*,*M*,*Some other date*,*short description*
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-test/src/main/resources/vanilla.csv
----------------------------------------------------------------------
diff --git a/crunch-test/src/main/resources/vanilla.csv b/crunch-test/src/main/resources/vanilla.csv
new file mode 100644
index 0000000..09a3185
--- /dev/null
+++ b/crunch-test/src/main/resources/vanilla.csv
@@ -0,0 +1,4 @@
+1,2,3,4
+5,6,7,8
+9,10,11
+12,13,14
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-test/src/main/resources/withNewlines.csv
----------------------------------------------------------------------
diff --git a/crunch-test/src/main/resources/withNewlines.csv b/crunch-test/src/main/resources/withNewlines.csv
new file mode 100644
index 0000000..be7903c
--- /dev/null
+++ b/crunch-test/src/main/resources/withNewlines.csv
@@ -0,0 +1,5 @@
+"Champion, Mac","1234 Hoth St.
+	Apartment 101
+	Atlanta, GA
+	64086","30","M","5/28/2010 12:00:00 AM","Just some guy"
+"Champion, Mac","5678 Tatooine Rd. Apt 5, Mobile, AL 36608","30","M","Some other date","short description"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9e29799..813c82d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -604,6 +604,7 @@ under the License.
             <exclude>**/resources/*.txt</exclude>
             <exclude>**/resources/**/*.txt</exclude>
             <exclude>**/resources/*.avro</exclude>
+            <exclude>**/resources/*.csv</exclude>
             <exclude>**/goal.txt</exclude>
             <exclude>**/target/generated-test-sources/**</exclude>
             <exclude>**/scripts/scrunch</exclude>