You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by mk...@apache.org on 2014/05/21 19:53:12 UTC
git commit: CRUNCH-362: Add CSVFileSource
Repository: crunch
Updated Branches:
refs/heads/master 657cdd6b3 -> ce79a7a1b
CRUNCH-362: Add CSVFileSource
Signed-off-by: Micah Whitacre <mk...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ce79a7a1
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ce79a7a1
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ce79a7a1
Branch: refs/heads/master
Commit: ce79a7a1be6cc4670d4fcf58184028a0663ef139
Parents: 657cdd6
Author: Mac Champion <gi...@gmail.com>
Authored: Wed Mar 26 16:22:33 2014 -0500
Committer: Micah Whitacre <mk...@apache.org>
Committed: Wed May 21 12:50:46 2014 -0500
----------------------------------------------------------------------
.../crunch/io/text/csv/CSVFileSourceIT.java | 119 ++++++
.../io/text/csv/CSVFileReaderFactory.java | 94 +++++
.../crunch/io/text/csv/CSVFileSource.java | 207 ++++++++++
.../crunch/io/text/csv/CSVInputFormat.java | 213 ++++++++++
.../crunch/io/text/csv/CSVLineReader.java | 406 +++++++++++++++++++
.../crunch/io/text/csv/CSVReadableData.java | 75 ++++
.../crunch/io/text/csv/CSVRecordIterator.java | 119 ++++++
.../crunch/io/text/csv/CSVRecordReader.java | 202 +++++++++
.../crunch/io/text/csv/CSVLineReaderTest.java | 78 ++++
.../io/text/csv/CSVRecordIteratorTest.java | 75 ++++
crunch-test/src/main/resources/UTF8.csv | 1 +
.../src/main/resources/brokenChineseLines.csv | 8 +
.../resources/customQuoteCharWithNewlines.csv | 5 +
crunch-test/src/main/resources/vanilla.csv | 4 +
crunch-test/src/main/resources/withNewlines.csv | 5 +
pom.xml | 1 +
16 files changed, 1612 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java b/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
new file mode 100644
index 0000000..a81f78d
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text.csv;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Collection;
+import static org.junit.Assert.*;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class CSVFileSourceIT {
+ @Rule
+ public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+ @Test
+ public void testVanillaCSV() throws Exception {
+ String[] expectedFileContents = { "1,2,3,4", "5,6,7,8", "9,10,11", "12,13,14" };
+
+ String vanillaCSVFile = tmpDir.copyResourceFileName("vanilla.csv");
+ Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(vanillaCSVFile)));
+ pipeline.run();
+
+ Collection<String> csvLinesList = csvLines.asCollection().getValue();
+
+ for (int i = 0; i < expectedFileContents.length; i++) {
+ assertTrue(csvLinesList.contains(expectedFileContents[i]));
+ }
+ }
+
+ @Test
+ public void testCSVWithNewlines() throws Exception {
+ String[] expectedFileContents = {
+ "\"Champion, Mac\",\"1234 Hoth St.\n\tApartment 101\n\tAtlanta, GA\n\t64086\",\"30\",\"M\",\"5/28/2010 12:00:00 AM\",\"Just some guy\"",
+ "\"Champion, Mac\",\"5678 Tatooine Rd. Apt 5, Mobile, AL 36608\",\"30\",\"M\",\"Some other date\",\"short description\"" };
+
+ String csvWithNewlines = tmpDir.copyResourceFileName("withNewlines.csv");
+ Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(csvWithNewlines)));
+ pipeline.run();
+
+ Collection<String> csvLinesList = csvLines.asCollection().getValue();
+
+ for (int i = 0; i < expectedFileContents.length; i++) {
+ assertTrue(csvLinesList.contains(expectedFileContents[i]));
+ }
+ }
+
+ /**
+ * This test is to make sure that custom char values set in the FileSource are
+ * successfully picked up and used later by the InputFormat.
+ */
+ @Test
+ public void testCSVWithCustomQuoteAndNewlines() throws IOException {
+ String[] expectedFileContents = {
+ "*Champion, Mac*,*1234 Hoth St.\n\tApartment 101\n\tAtlanta, GA\n\t64086*,*30*,*M*,*5/28/2010 12:00:00 AM*,*Just some guy*",
+ "*Mac, Champion*,*5678 Tatooine Rd. Apt 5, Mobile, AL 36608*,*30*,*M*,*Some other date*,*short description*" };
+
+ String csvWithNewlines = tmpDir.copyResourceFileName("customQuoteCharWithNewlines.csv");
+ Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(csvWithNewlines),
+ CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '*', '*',
+ CSVLineReader.DEFAULT_ESCAPE_CHARACTER));
+ pipeline.run();
+
+ Collection<String> csvLinesList = csvLines.asCollection().getValue();
+
+ for (int i = 0; i < expectedFileContents.length; i++) {
+ assertTrue(csvLinesList.contains(expectedFileContents[i]));
+ }
+ }
+
+ /**
+ * This is effectively a mirror the above address tests, but using Chinese
+ * characters, even for the quotation marks and escape characters.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testBrokenLineParsingInChinese() throws IOException {
+ final String[] expectedChineseLines = { "您好我叫马克,我从亚拉巴马州来,我是软件工程师,我二十八岁", "我有一个宠物,它是一个小猫,它六岁,它很漂亮",
+ "我喜欢吃饭,“我觉得这个饭最好\n*蛋糕\n*包子\n*冰淇淋\n*啤酒“,他们都很好,我也很喜欢奶酪但它是不健康的", "我是男的,我的头发很短,我穿蓝色的裤子,“我穿黑色的、“衣服”" };
+ String chineseLines = tmpDir.copyResourceFileName("brokenChineseLines.csv");
+
+ Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(chineseLines),
+ CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '“', '”', '、'));
+ pipeline.run();
+ Collection<String> csvLinesList = csvLines.asCollection().getValue();
+ for (int i = 0; i < expectedChineseLines.length; i++) {
+ assertTrue(csvLinesList.contains(expectedChineseLines[i]));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java
new file mode 100644
index 0000000..c1c687e
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileReaderFactory.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text.csv;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Iterators;
+
+/**
+ * The {@code FileReaderFactory} instance that is responsible for building a
+ * {@code CSVRecordIterator}
+ */
+public class CSVFileReaderFactory implements FileReaderFactory<String> {
+ private static final Log LOG = LogFactory.getLog(CSVFileReaderFactory.class);
+ private int bufferSize;
+ private String inputFileEncoding;
+ private char openQuoteChar;
+ private char closeQuoteChar;
+ private char escapeChar;
+
+ /**
+ * Creates a new {@code CSVFileReaderFactory} instance with default
+ * configuration
+ */
+ CSVFileReaderFactory() {
+ this(CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING,
+ CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER,
+ CSVLineReader.DEFAULT_ESCAPE_CHARACTER);
+ }
+
+ /**
+ * Creates a new {@code CSVFileReaderFactory} instance with custon
+ * configuration
+ *
+ * @param bufferSize
+ * The size of the buffer to be used in the underlying
+ * {@code CSVLineReader}
+ * @param inputFileEncoding
+ * The the encoding of the input file to be read by the underlying
+ * {@code CSVLineReader}
+ * @param openQuoteChar
+ * The character representing the quote character to be used in the
+ * underlying {@code CSVLineReader}
+ * @param closeQuoteChar
+ * The character representing the quote character to be used in the
+ * underlying {@code CSVLineReader}
+ * @param escapeChar
+ * The character representing the escape character to be used in the
+ * underlying {@code CSVLineReader}
+ */
+ CSVFileReaderFactory(final int bufferSize, final String inputFileEncoding, final char openQuoteChar,
+ final char closeQuoteChar, final char escapeChar) {
+ this.bufferSize = bufferSize;
+ this.inputFileEncoding = inputFileEncoding;
+ this.openQuoteChar = openQuoteChar;
+ this.closeQuoteChar = closeQuoteChar;
+ this.escapeChar = escapeChar;
+ }
+
+ @Override
+ public Iterator<String> read(FileSystem fs, Path path) {
+ FSDataInputStream is;
+ try {
+ is = fs.open(path);
+ return new CSVRecordIterator(is, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar);
+ } catch (IOException e) {
+ LOG.info("Could not read path: " + path, e);
+ return Iterators.emptyIterator();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java
new file mode 100644
index 0000000..9021f78
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVFileSource.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text.csv;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileSourceImpl;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A {@code Source} instance that uses the {@code CSVInputFormat}, which gives
+ * each map task one single CSV record, regardless of how many lines it may
+ * span.
+ */
+public class CSVFileSource extends FileSourceImpl<String> implements ReadableSource<String> {
+
+ /**
+ * The key used in the {@code CSVInputFormat}'s {@code FormatBundle} to set
+ * the underlying {@code CSVLineReader}'s buffer size
+ */
+ public static final String CSV_BUFFER_SIZE = "csv.buffersize";
+
+ /**
+ * The key used in the {@code CSVInputFormat}'s {@code FormatBundle} to set
+ * the underlying {@code CSVLineReader}'s input file encoding
+ */
+ public static final String CSV_INPUT_FILE_ENCODING = "csv.inputfileencoding";
+
+ /**
+ * The key used in the {@code CSVInputFormat}'s {@code FormatBundle} to set
+ * the underlying {@code CSVLineReader}'s open quote character
+ */
+ public static final String CSV_OPEN_QUOTE_CHAR = "csv.openquotechar";
+
+ /**
+ * The key used in the {@code CSVInputFormat}'s {@code FormatBundle} to set
+ * the underlying {@code CSVLineReader}'s close quote character
+ */
+ public static final String CSV_CLOSE_QUOTE_CHAR = "csv.closequotechar";
+
+ /**
+ * The key used in the {@code CSVInputFormat}'s {@code FormatBundle} to set
+ * the underlying {@code CSVLineReader}'s escape character
+ */
+ public static final String CSV_ESCAPE_CHAR = "csv.escapechar";
+
+ private int bufferSize;
+ private String inputFileEncoding;
+ private char openQuoteChar;
+ private char closeQuoteChar;
+ private char escapeChar;
+
+ /**
+ * Create a new CSVFileSource instance
+ *
+ * @param path
+ * The {@code Path} to the input data
+ */
+ public CSVFileSource(List<Path> paths) {
+ this(paths, CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING,
+ CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER,
+ CSVLineReader.DEFAULT_ESCAPE_CHARACTER);
+ }
+
+ /**
+ * Create a new CSVFileSource instance
+ *
+ * @param path
+ * The {@code Path} to the input data
+ */
+ public CSVFileSource(Path path) {
+ this(path, CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING,
+ CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER,
+ CSVLineReader.DEFAULT_ESCAPE_CHARACTER);
+ }
+
+ /**
+ * Create a new CSVFileSource instance with all configurable options.
+ *
+ * @param paths
+ * A list of {@code Path}s to be used as input data.
+ * @param bufferSize
+ * The size of the buffer to be used in the underlying
+ * {@code CSVLineReader}
+ * @param inputFileEncoding
+ * The the encoding of the input file to be read by the underlying
+ * {@code CSVLineReader}
+ * @param openQuoteChar
+ * The character representing the quote character to be used in the
+ * underlying {@code CSVLineReader}
+ * @param closeQuoteChar
+ * The character representing the quote character to be used in the
+ * underlying {@code CSVLineReader}
+ * @param escapeChar
+ * The character representing the escape character to be used in the
+ * underlying {@code CSVLineReader}
+ */
+ public CSVFileSource(List<Path> paths, final int bufferSize, final String inputFileEncoding,
+ final char openQuoteChar, final char closeQuoteChar, final char escapeChar) {
+ super(paths, Writables.strings(), getCSVBundle(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar,
+ escapeChar));
+ setPrivateVariables(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar);
+ }
+
+ /**
+ * Create a new CSVFileSource instance with all configurable options.
+ *
+ * @param path
+ * The {@code Path} to the input data
+ * @param bufferSize
+ * The size of the buffer to be used in the underlying
+ * {@code CSVLineReader}
+ * @param inputFileEncoding
+ * The the encoding of the input file to be read by the underlying
+ * {@code CSVLineReader}
+ * @param openQuoteChar
+ * The character representing the quote character to be used in the
+ * underlying {@code CSVLineReader}
+ * @param closeQuoteChar
+ * The character representing the quote character to be used in the
+ * underlying {@code CSVLineReader}
+ * @param escapeChar
+ * The character representing the escape character to be used in the
+ * underlying {@code CSVLineReader}
+ */
+ public CSVFileSource(Path path, final int bufferSize, final String inputFileEncoding, final char openQuoteChar,
+ final char closeQuoteChar, final char escapeChar) {
+ super(path, Writables.strings(), getCSVBundle(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar,
+ escapeChar));
+ setPrivateVariables(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar);
+ }
+
+ @Override
+ public Iterable<String> read(Configuration conf) throws IOException {
+ return read(conf,
+ new CSVFileReaderFactory(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar));
+ }
+
+ @Override
+ public ReadableData<String> asReadable() {
+ return new CSVReadableData(paths, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar);
+ }
+
+ @Override
+ public String toString() {
+ return "CSV(" + pathsAsString() + ")";
+ }
+
+ /**
+ * Configures the job with any custom options. These will be retrieved later
+ * by {@code CSVInputFormat}
+ */
+ private static FormatBundle<CSVInputFormat> getCSVBundle(final int bufferSize, final String inputFileEncoding,
+ final char openQuoteChar, final char closeQuoteChar, final char escapeChar) {
+ FormatBundle<CSVInputFormat> bundle = FormatBundle.forInput(CSVInputFormat.class);
+ bundle.set(RuntimeParameters.DISABLE_COMBINE_FILE, "true");
+ bundle.set(CSV_BUFFER_SIZE, String.valueOf(bufferSize));
+ bundle.set(CSV_INPUT_FILE_ENCODING, String.valueOf(inputFileEncoding));
+ bundle.set(CSV_OPEN_QUOTE_CHAR, String.valueOf(openQuoteChar));
+ bundle.set(CSV_CLOSE_QUOTE_CHAR, String.valueOf(closeQuoteChar));
+ bundle.set(CSV_ESCAPE_CHAR, String.valueOf(escapeChar));
+ return bundle;
+ }
+
+ private void setPrivateVariables(final int bufferSize, final String inputFileEncoding, final char openQuoteChar,
+ final char closeQuoteChar, final char escapeChar) {
+ if (isSameCharacter(openQuoteChar, escapeChar)) {
+ throw new IllegalArgumentException("The open quote (" + openQuoteChar + ") and escape (" + escapeChar
+ + ") characters must be different!");
+ }
+ if (isSameCharacter(closeQuoteChar, escapeChar)) {
+ throw new IllegalArgumentException("The close quote (" + closeQuoteChar + ") and escape (" + escapeChar
+ + ") characters must be different!");
+ }
+ this.bufferSize = bufferSize;
+ this.inputFileEncoding = inputFileEncoding;
+ this.openQuoteChar = openQuoteChar;
+ this.closeQuoteChar = closeQuoteChar;
+ this.escapeChar = escapeChar;
+ }
+
+ private boolean isSameCharacter(final char c1, final char c2) {
+ return c2 == c1;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
new file mode 100644
index 0000000..5d5abe3
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text.csv;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A {@link FileInputFormat} for use specifically with CSV files. This input
+ * format deals with the fact that CSV files can potentially have multiple lines
+ * within fields which should all be treated as one record.
+ */
+public class CSVInputFormat extends FileInputFormat<LongWritable, Text> {
+ private int bufferSize;
+ private String inputFileEncoding;
+ private char openQuoteChar;
+ private char closeQuoteChar;
+ private char escapeChar;
+
+ /**
+ * This method is used by crunch to get an instance of {@link CSVRecordReader}
+ *
+ * @param split
+ * the {@link InputSplit} that will be assigned to the record reader
+ * @param context
+ * the {@TaskAttemptContext} for the job
+ * @return an instance of {@link CSVRecordReader} created using
+ * {@link CSVInputFormat#getSeparatorChar()},
+ * {@link CSVInputFormat#getQuoteChar()}, and
+ * {@link CSVInputFormat#getEscapeChar()}.
+ */
+ @Override
+ public RecordReader<LongWritable, Text> createRecordReader(final InputSplit split, final TaskAttemptContext context) {
+ return new CSVRecordReader(this.bufferSize, this.inputFileEncoding, this.openQuoteChar, this.closeQuoteChar,
+ this.escapeChar);
+ }
+
+ /**
+ * A method used by crunch to calculate the splits for each file. This will
+ * split each CSV file at the end of a valid CSV record.
+ *
+ * @param job
+ * the {@link JobContext} for the current job.
+ * @return a List containing all of the calculated splits for a single file.
+ * @throws IOException
+ * if an error occurs while accessing HDFS
+ */
+ @Override
+ public List<InputSplit> getSplits(final JobContext job) throws IOException {
+ final long splitSize = job.getConfiguration().getLong("csv.input.split.size", 67108864);
+ final List<InputSplit> splits = new ArrayList<InputSplit>();
+ final Path[] paths = FileUtil.stat2Paths(listStatus(job).toArray(new FileStatus[0]));
+ FileSystem fileSystem = null;
+ FSDataInputStream inputStream = null;
+ try {
+ for (final Path path : paths) {
+ fileSystem = path.getFileSystem(job.getConfiguration());
+ inputStream = fileSystem.open(path);
+ splits.addAll(getSplitsForFile(splitSize, fileSystem.getFileStatus(path).getLen(), path, inputStream));
+ }
+ return splits;
+ } finally {
+ inputStream.close();
+ }
+ }
+
+ /**
+ * In summary, this method will start at the beginning of the file, seek to
+ * the position corresponding to the desired split size, seek to the end of
+ * the line that contains that position, then attempt to seek until the
+ * CSVLineReader indicates that the current position is no longer within a CSV
+ * record. Then, it will mark that position for a split and a repeat its
+ * logic.
+ */
+ @VisibleForTesting
+ protected List<FileSplit> getSplitsForFile(final long splitSize, final long fileSize, final Path fileName,
+ final FSDataInputStream inputStream) throws IOException {
+ final List<FileSplit> splitsList = new ArrayList<FileSplit>();
+
+ long splitStart;
+ long currentPosition = 0;
+
+ boolean endOfFile = false;
+ while (!endOfFile) {
+ // Set the start of this split to the furthest read point in the file
+ splitStart = currentPosition;
+
+ // Skip a number of bytes equal to the desired split size to avoid parsing
+ // every csv line, which greatly increases the run time
+ currentPosition = splitStart + splitSize;
+
+ // The input stream will freak out if we try to seek past the EOF
+ if (currentPosition >= fileSize) {
+ currentPosition = fileSize;
+ endOfFile = true;
+ final FileSplit fileSplit = new FileSplit(fileName, splitStart, currentPosition - splitStart, new String[] {});
+ splitsList.add(fileSplit);
+ break;
+ }
+
+ // Every time we seek to the new approximate split point,
+ // we need to create a new CSVLineReader around the stream.
+ inputStream.seek(currentPosition);
+ final CSVLineReader csvLineReader = new CSVLineReader(inputStream, this.bufferSize, this.inputFileEncoding,
+ this.openQuoteChar, this.closeQuoteChar, this.escapeChar);
+
+ // This line is potentially garbage because we most likely just sought to
+ // the middle of a line. Read the rest of the line and leave it for the
+ // previous split. Then reset the multi-line CSV record boolean, because
+ // the partial line will have a very high chance of falsely triggering the
+ // class wide multi-line logic.
+ currentPosition += csvLineReader.readFileLine(new Text());
+ csvLineReader.resetMultiLine();
+
+ // Now, we may still be in the middle of a multi-line CSV record.
+ currentPosition += csvLineReader.readFileLine(new Text());
+
+ // If we are, read until we are not.
+ while (csvLineReader.isInMultiLine()) {
+ final int bytesRead = csvLineReader.readFileLine(new Text());
+ // End of file
+ if (bytesRead <= 0) {
+ break;
+ }
+ currentPosition += bytesRead;
+ }
+
+ // We're out of the multi-line CSV record, so it's safe to end the
+ // previous split.
+ splitsList.add(new FileSplit(fileName, splitStart, currentPosition - splitStart, new String[] {}));
+ }
+
+ return splitsList;
+ }
+
+ /**
+ * This method will read the configuration that is set in
+ * {@link CSVFileSource}'s private getBundle() method
+ *
+ * @param jobConf
+ * The {@code JobConf} instance from which the CSV configuration
+ * parameters will be read, if necessary.
+ */
+ public void configure(JobConf jobConf) {
+ String bufferValue = jobConf.get(CSVFileSource.CSV_BUFFER_SIZE);
+ if ("".equals(bufferValue)) {
+ bufferSize = CSVLineReader.DEFAULT_BUFFER_SIZE;
+ } else {
+ bufferSize = Integer.parseInt(bufferValue);
+ }
+
+ String inputFileEncodingValue = jobConf.get(CSVFileSource.CSV_INPUT_FILE_ENCODING);
+ if ("".equals(inputFileEncodingValue)) {
+ inputFileEncoding = CSVLineReader.DEFAULT_INPUT_FILE_ENCODING;
+ } else {
+ inputFileEncoding = inputFileEncodingValue;
+ }
+
+ String openQuoteCharValue = jobConf.get(CSVFileSource.CSV_OPEN_QUOTE_CHAR);
+ if ("".equals(openQuoteCharValue)) {
+ openQuoteChar = CSVLineReader.DEFAULT_QUOTE_CHARACTER;
+ } else {
+ openQuoteChar = openQuoteCharValue.charAt(0);
+ }
+
+ String closeQuoteCharValue = jobConf.get(CSVFileSource.CSV_CLOSE_QUOTE_CHAR);
+ if ("".equals(closeQuoteCharValue)) {
+ closeQuoteChar = CSVLineReader.DEFAULT_QUOTE_CHARACTER;
+ } else {
+ closeQuoteChar = closeQuoteCharValue.charAt(0);
+ }
+
+ String escapeCharValue = jobConf.get(CSVFileSource.CSV_ESCAPE_CHAR);
+ if ("".equals(escapeCharValue)) {
+ escapeChar = CSVLineReader.DEFAULT_ESCAPE_CHARACTER;
+ } else {
+ escapeChar = escapeCharValue.charAt(0);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java
new file mode 100644
index 0000000..83e2abe
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVLineReader.java
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text.csv;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.hadoop.io.Text;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A record reader written specifically to read individual lines from CSV files.
+ * Most notably, it can read CSV records which span multiple lines.
+ */
+@ParametersAreNonnullByDefault
+public class CSVLineReader {
+
+ // InputStream related variables
+ /**
+ * The default buffer size (64k) to be used when reading from the InputStream
+ */
+ public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ private final InputStreamReader inputStreamReader;
+ private final String inputFileEncoding;
+ private final CharsetEncoder charsetEncoder;
+ private char[] buffer;
+ private final int bufferSize;
+ private int bufferLength = 0;
+ private int bufferPosition = 0;
+ private boolean bufferIsPadded = false;
+ private static char CR = '\r';
+ private static char LF = '\n';
+ private boolean endOfFile = false;
+
+ // CSV parsing related variables
+ /**
+ * The default character to represent quotation marks, '"'
+ */
+ public static final char DEFAULT_QUOTE_CHARACTER = '"';
+ /**
+ * The default character to represent an escape used before a control
+ * character that should be displayed, '\'
+ */
+ public static final char DEFAULT_ESCAPE_CHARACTER = '\\';
+ /**
+ * The default character to represent a null character, '\0'
+ */
+ public static final char NULL_CHARACTER = '\0';
+ /**
+ * The default input file encoding to read with, UTF-8
+ */
+ public static final String DEFAULT_INPUT_FILE_ENCODING = "UTF-8";
+ private final char openQuoteChar;
+ private final char closeQuoteChar;
+ private final char escape;
+ private boolean inMultiLine = false;
+ private boolean currentlyInQuotes = false;
+ private boolean endOfLineReached = false;
+ private Text inputText = new Text();
+
+ /**
+ * This constructor will use default values for buffer size and control
+ * characters.
+ *
+ * @param inputStream
+ * The @{link InputStream} to read from. Note that this input stream
+ * should start at the very beginning of the CSV file to be read OR
+ * at the very beginning of a CSV entry. If the input stream starts
+ * at any other position (such as in the middle of a line) this
+ * reader will not work properly.
+ * @throws UnsupportedEncodingException
+ */
+ public CSVLineReader(final InputStream inputStream) throws UnsupportedEncodingException {
+ this(inputStream, DEFAULT_BUFFER_SIZE, DEFAULT_INPUT_FILE_ENCODING, DEFAULT_QUOTE_CHARACTER,
+ DEFAULT_QUOTE_CHARACTER, DEFAULT_ESCAPE_CHARACTER);
+ }
+
+ /**
+ * The fully customizable constructor for CSVLineReader
+ *
+ * @param inputStream
+ * The @{link InputStream} to read from. Note that this input stream
+ * should start at the very beginning of the CSV file to be read OR
+ * at the very beginning of a CSV entry. If the input stream starts
+ * at any other position (such as in the middle of a line) this
+ * reader will not work properly.
+ * @param bufferSize
+ * The size of the buffer used when reading the input stream
+ * @param inputFileEncoding
+ * The encoding of the file to read from.
+ * @param openQuoteChar
+ * Used to specify a custom open quote character
+ * @param closeQuoteChar
+ * Used to specify a custom close quote character
+ * @param escape
+ * Used to specify a custom escape character
+ * @throws UnsupportedEncodingException
+ */
+ public CSVLineReader(final InputStream inputStream, final int bufferSize, final String inputFileEncoding,
+ final char openQuoteChar, final char closeQuoteChar, final char escapeChar) {
+ Preconditions.checkNotNull(inputStream, "inputStream may not be null");
+ Preconditions.checkNotNull(inputFileEncoding, "inputFileEncoding may not be null");
+ if (bufferSize <= 0) {
+ throw new IllegalArgumentException("The buffer (" + bufferSize + ")cannot be <= 0");
+ }
+
+ // Input Stream related variables
+ try {
+ this.inputStreamReader = new InputStreamReader(inputStream, inputFileEncoding);
+ } catch (final UnsupportedEncodingException uee) {
+ throw new RuntimeException(inputFileEncoding + " is not a supported encoding.", uee);
+ }
+ this.bufferSize = bufferSize;
+ this.buffer = new char[this.bufferSize];
+
+ // CSV parsing related variables
+ if (isSameCharacter(openQuoteChar, escapeChar)) {
+ throw new IllegalArgumentException("The open quote (" + openQuoteChar + ") and escape (" + escapeChar
+ + ") characters must be different!");
+ }
+ if (isSameCharacter(closeQuoteChar, escapeChar)) {
+ throw new IllegalArgumentException("The close quote (" + closeQuoteChar + ") and escape (" + escapeChar
+ + ") characters must be different!");
+ }
+ this.openQuoteChar = openQuoteChar;
+ this.closeQuoteChar = closeQuoteChar;
+ this.escape = escapeChar;
+ this.inputFileEncoding = inputFileEncoding;
+ this.charsetEncoder = Charset.forName(inputFileEncoding).newEncoder();
+ }
+
+ /**
+ * This method will read through one full CSV record, place its content into
+ * the input Text and return the number of bytes (including newline
+ * characters) that were consumed.
+ *
+ * @param input
+ * a mutable @{link Text} object into which the text of the CSV
+ * record will be stored, without any line feeds or carriage returns
+ * @return the number of byes that were read, including any control
+ * characters, line feeds, or carriage returns.
+ * @throws IOException
+ * if an IOException occurs while handling the file to be read
+ */
+ public int readCSVLine(final Text input) throws IOException {
+ Preconditions.checkNotNull(input, "inputText may not be null");
+ inputText = new Text(input);
+ long totalBytesConsumed = 0;
+ if (endOfFile) {
+ return 0;
+ }
+ if (inMultiLine) {
+ throw new RuntimeException("Cannot begin reading a CSV record while inside of a multi-line CSV record.");
+ }
+
+ inputText.clear();
+ do {
+ totalBytesConsumed += readFileLine(inputText);
+ // a line has been read. We need to see if we're still in quotes and tack
+ // on a newline if so
+ if (currentlyInQuotes && !endOfFile) {
+ // Add one LF to mark the line return, otherwise any multi-line CSV
+ // record will all be on one line.
+ inputText.set(new StringBuilder().append(inputText.toString()).append('\n').toString());
+ }
+ } while (currentlyInQuotes);
+
+ if (totalBytesConsumed > Integer.MAX_VALUE) {
+ throw new IOException("Too many bytes consumed before newline: " + Integer.MAX_VALUE);
+ }
+
+ input.set(inputText);
+ return (int) totalBytesConsumed;
+ }
+
+ /**
+ * A method for reading through one single line in the CSV file, that is, it
+ * will read until the first line feed, carriage return, or set of both is
+ * found. The CSV parsing logic markers are maintained outside of this method
+ * to enable the manipulation that logic in order to find the beginning of a
+ * CSV record. Use {@link CSVLineReader#isInMultiLine()} and
+ * {@link CSVLineReader#resetMultiLine()} to do so. See
+ * {@link CSVInputFormat#getSplitsForFile(long, long, org.apache.hadoop.fs.Path, org.apache.hadoop.fs.FSDataInputStream)}
+ * for an example.
+ *
+ * @param input
+ * a mutable @{link Text} object into which the text of the line will
+ * be stored, without any line feeds or carriage returns
+ * @return the number of byes that were read, including any control
+ * characters, line feeds, or carriage returns.
+ * @throws IOException
+ * if an IOException occurs while handling the file to be read
+ */
+ public int readFileLine(final Text input) throws IOException {
+ Preconditions.checkNotNull(input, "inputText may not be null");
+ if (endOfFile) {
+ return 0;
+ }
+
+ // This integer keeps track of the number of newline characters used to
+ // terminate the line being read. This
+ // could be 1, in the case of LF or CR, or 2, in the case of CRLF.
+ int newlineLength = 0;
+ int inputTextLength = 0;
+ long bytesConsumed = 0;
+ int readTextLength = 0;
+ int startPosition = bufferPosition;
+ endOfLineReached = false;
+ inputText = new Text(input);
+
+ do {
+ boolean checkForLF = false;
+ // Figure out where we are in the buffer and fill it if necessary.
+ if (bufferPosition >= bufferLength) {
+ refillBuffer();
+ startPosition = bufferPosition;
+ if (endOfFile) {
+ break;
+ }
+ }
+
+ newlineLength = 0;
+ // Iterate through the buffer looking for newline characters while keeping
+ // track of if we're in a field
+ // and/or in quotes.
+ for (; bufferPosition < bufferLength; ++bufferPosition) {
+ bytesConsumed += calculateCharacterByteLength(buffer[bufferPosition]);
+ if (buffer[bufferPosition] == this.escape) {
+ if (isNextCharacterEscapable(currentlyInQuotes, bufferPosition)) {
+ // checks to see if we are in quotes and if the next character is a
+ // quote or an escape
+ // character. If so, that's fine. Record the next character's size
+ // and skip it.
+ ++bufferPosition;
+ bytesConsumed += calculateCharacterByteLength(buffer[bufferPosition]);
+ }
+ } else if (buffer[bufferPosition] == openQuoteChar || buffer[bufferPosition] == closeQuoteChar) {
+ // toggle currentlyInQuotes if we've hit a non-escaped quote character
+ currentlyInQuotes = !currentlyInQuotes;
+ } else if (buffer[bufferPosition] == LF || buffer[bufferPosition] == CR) {
+ boolean lastCharWasCR = buffer[bufferPosition] == CR;
+ // Line is over, make note and increment the size of the newlinelength
+ // counter.
+ endOfLineReached = true;
+ ++newlineLength;
+ ++bufferPosition;
+ if (lastCharWasCR && buffer[bufferPosition] == LF) {
+ lastCharWasCR = false;
+ // Check for LF (in case of CRLF line endings) and increment the
+ // counter, skip it by moving the
+ // buffer position, then record the length of the LF.
+ ++newlineLength;
+ ++bufferPosition;
+ bytesConsumed += calculateCharacterByteLength(buffer[bufferPosition]);
+ } else if (lastCharWasCR && bufferPosition >= bufferLength) {
+ // We just read a CR at the very end of the buffer. If this is a
+ // file with CRLF line endings, there will be a LF next that we need
+ // to check for and account for in bytesRead before we count this
+ // line as "read".
+ checkForLF = true;
+ }
+ break;
+ }
+ }
+ // This is the length of the actual text and important stuff in the line.
+ readTextLength = bufferPosition - startPosition - newlineLength;
+
+ // Append the results.
+ if (readTextLength > Integer.MAX_VALUE - inputTextLength) {
+ readTextLength = Integer.MAX_VALUE - inputTextLength;
+ }
+ if (readTextLength > 0) {
+ // This will append the portion of the buffer containing only the
+ // important text, omitting any newline characters
+ inputText.set(new StringBuilder().append(inputText.toString())
+ .append(new String(buffer, startPosition, readTextLength)).toString());
+ inputTextLength += readTextLength;
+ }
+
+ // If the last character we read was a CR at the end of the buffer, we
+ // need to check for an LF after a buffer refill.
+ if (checkForLF) {
+ refillBuffer();
+ if (endOfFile) {
+ break;
+ }
+ if (buffer[bufferPosition] == LF) {
+ bytesConsumed += calculateCharacterByteLength(buffer[bufferPosition]);
+ ++bufferPosition;
+ ++newlineLength;
+ }
+ }
+
+ } while (newlineLength == 0 && bytesConsumed < Integer.MAX_VALUE);
+
+ if (endOfLineReached) {
+ if (currentlyInQuotes) {
+ inMultiLine = true;
+ } else {
+ inMultiLine = false;
+ }
+ }
+
+ if (bytesConsumed > Integer.MAX_VALUE) {
+ throw new IOException("Too many bytes consumed before newline: " + Integer.MAX_VALUE);
+ }
+
+ input.set(inputText);
+ return (int) bytesConsumed;
+ }
+
+ /**
+ * For use with {@link CSVLineReader#readFileLine(Text)}. Returns current
+ * multi-line CSV status.
+ *
+ * @return a boolean signifying if the last
+ * {@link CSVLineReader#readFileLine(Text)} call ended in the middle
+ * of a multi-line CSV record
+ */
+ public boolean isInMultiLine() {
+ return inMultiLine;
+ }
+
+ /**
+ * For use with {@link CSVLineReader#readFileLine(Text)}. Resets current
+ * multi-line CSV status.
+ */
+ public void resetMultiLine() {
+ inMultiLine = false;
+ currentlyInQuotes = false;
+ }
+
+ private boolean isSameCharacter(final char c1, final char c2) {
+ return c1 != NULL_CHARACTER && c1 == c2;
+ }
+
+ private boolean isNextCharacterEscapable(final boolean inQuotes, final int i) {
+ return inQuotes // we are in quotes, therefore there can be escaped quotes
+ // in here.
+ && buffer.length > (i + 1) // there is indeed another character to
+ // check.
+ && (buffer[i + 1] == closeQuoteChar || buffer[i + 1] == openQuoteChar || buffer[i + 1] == this.escape);
+ }
+
+ private void refillBuffer() throws IOException {
+ bufferPosition = 0;
+
+ // Undo the buffer padding
+ if (bufferIsPadded) {
+ buffer = new char[bufferLength];
+ bufferIsPadded = false;
+ }
+
+ bufferLength = inputStreamReader.read(buffer, 0, buffer.length);
+ // if bufferLength < bufferSize, this buffer will contain the end of the
+ // file. However, our line logic needs to be able to see what's a few spots
+ // past the current position. This will cause an index out of bounds
+ // exception if the buffer is full. So, my solution is to add a few extra
+ // spaces to the buffer so that the logic can still read ahead.
+ if (buffer.length == bufferLength) {
+ final char[] biggerBuffer = new char[bufferLength + 3];
+ for (int i = 0; i < bufferLength; i++) {
+ biggerBuffer[i] = buffer[i];
+ }
+ buffer = biggerBuffer;
+ bufferIsPadded = true;
+ }
+
+ if (bufferLength <= 0) {
+ endOfFile = true;
+ }
+ }
+
+ private int calculateCharacterByteLength(final char character) {
+ try {
+ return charsetEncoder.encode(CharBuffer.wrap(new char[] { character })).limit();
+ } catch (final CharacterCodingException e) {
+ throw new RuntimeException("The character attempting to be read (" + character + ") could not be encoded with "
+ + inputFileEncoding);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java
new file mode 100644
index 0000000..63e74d9
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVReadableData.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text.csv;
+
+import java.util.List;
+
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.ReadableDataImpl;
+import org.apache.hadoop.fs.Path;
+
+public class CSVReadableData extends ReadableDataImpl<String> {
+
+ private int bufferSize;
+ private String inputFileEncoding;
+ private char openQuoteChar;
+ private char closeQuoteChar;
+ private char escapeChar;
+
+ /**
+ * Creates an instance of {@code CSVReadableData} with default configuration
+ *
+ * @param paths
+ * The paths of the files to be read
+ */
+ protected CSVReadableData(List<Path> paths) {
+ this(paths, CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING,
+ CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER,
+ CSVLineReader.DEFAULT_ESCAPE_CHARACTER);
+ }
+
+ /**
+ * Creates an instance of {@code CSVReadableData} with specified configuration
+ * @param paths
+ * a list of input file paths
+ * @param bufferSize
+ * the size of the buffer to use while parsing through the input file
+ * @param inputFileEncoding
+ * the encoding for the input file
+ * @param openQuote
+ * the character to use to open quote blocks
+ * @param closeQuote
+ * the character to use to close quote blocks
+ * @param escape
+ * the character to use for escaping control characters and quotes
+ */
+ protected CSVReadableData(List<Path> paths, final int bufferSize, final String inputFileEncoding,
+ final char openQuoteChar, final char closeQuoteChar, final char escapeChar) {
+ super(paths);
+ this.bufferSize = bufferSize;
+ this.inputFileEncoding = inputFileEncoding;
+ this.openQuoteChar = openQuoteChar;
+ this.closeQuoteChar = closeQuoteChar;
+ this.escapeChar = escapeChar;
+ }
+
+ @Override
+ protected FileReaderFactory<String> getFileReaderFactory() {
+ return new CSVFileReaderFactory(bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar, escapeChar);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java
new file mode 100644
index 0000000..df5e39c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordIterator.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text.csv;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Text;
+
+import com.google.common.io.Closeables;
+
+/**
+ * An {@code Iterator} for an internally created {@code CSVLineReader}
+ */
+public class CSVRecordIterator implements Iterator<String>, Closeable {
+ private CSVLineReader csvLineReader;
+ private InputStream inputStream;
+ private String currentLine;
+
+ /**
+ * Creates an instance of {@code CSVRecordIterator} with default configuration
+ *
+ * @param inputStream
+ * The {@code InputStream} for the CSV file to iterate over
+ * @throws UnsupportedEncodingException
+ */
+ public CSVRecordIterator(final InputStream inputStream) throws UnsupportedEncodingException {
+ this(inputStream, CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING,
+ CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER,
+ CSVLineReader.DEFAULT_ESCAPE_CHARACTER);
+ }
+
+ /**
+ * Creates an instance of {@code CSVRecordIterator} with custom configuration
+ *
+ * @param inputStream
+ * The {@code InputStream} for the CSV file to iterate over
+ * @param bufferSize
+ * The size of the buffer used when reading the input stream
+ * @param inputFileEncoding
+ * the encoding for the input file
+ * @param openQuote
+ * the character to use to open quote blocks
+ * @param closeQuote
+ * the character to use to close quote blocks
+ * @param escape
+ * the character to use for escaping control characters and quotes
+ * @throws UnsupportedEncodingException
+ */
+ public CSVRecordIterator(final InputStream inputStream, final int bufferSize, final String inputFileEncoding,
+ final char openQuoteChar, final char closeQuoteChar, final char escapeChar) throws UnsupportedEncodingException {
+ csvLineReader = new CSVLineReader(inputStream, bufferSize, inputFileEncoding, openQuoteChar, closeQuoteChar,
+ escapeChar);
+ this.inputStream = inputStream;
+ incrementValue();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (!(currentLine == null)) {
+ return true;
+ }
+ Closeables.closeQuietly(this);
+ return false;
+ }
+
+ @Override
+ public String next() {
+ String result = currentLine;
+ incrementValue();
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ incrementValue();
+ }
+
+ private void incrementValue() {
+ Text tempText = new Text();
+ try {
+ csvLineReader.readCSVLine(tempText);
+ } catch (IOException e) {
+ throw new RuntimeException("A problem occurred accessing the underlying CSV file stream.", e);
+ }
+ String tempTextAsString = tempText.toString();
+ if ("".equals(tempTextAsString)) {
+ currentLine = null;
+ } else {
+ currentLine = tempTextAsString;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (inputStream != null) {
+ inputStream.close();
+ inputStream = null;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java
new file mode 100644
index 0000000..d04da98
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVRecordReader.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text.csv;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An extension of {@link RecordReader} used to intelligently read CSV files
+ */
+public class CSVRecordReader extends RecordReader<LongWritable, Text> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CSVRecordReader.class);
+ private long start;
+ private long pos;
+ private long end;
+ private LongWritable key = null;
+ private Text value = null;
+
+ private FSDataInputStream fileIn;
+ private CSVLineReader csvLineReader;
+ private final char openQuote;
+ private final char closeQuote;
+ private final char escape;
+ private final String inputFileEncoding;
+ private final int fileStreamBufferSize;
+
+ private int totalRecordsRead = 0;
+
+ /**
+ * Default constructor, specifies default values for the {@link CSVLineReader}
+ */
+ public CSVRecordReader() {
+ this(CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING,
+ CSVLineReader.DEFAULT_QUOTE_CHARACTER, CSVLineReader.DEFAULT_QUOTE_CHARACTER,
+ CSVLineReader.DEFAULT_ESCAPE_CHARACTER);
+ }
+
+ /**
+ * Customizable constructor used to specify all input parameters for the
+ * {@link CSVLineReader}
+ *
+ * @param bufferSize
+ * the size of the buffer to use while parsing through the input file
+ * @param inputFileEncoding
+ * the encoding for the input file
+ * @param openQuote
+ * the character to use to open quote blocks
+ * @param closeQuote
+ * the character to use to close quote blocks
+ * @param escape
+ * the character to use for escaping control characters and quotes
+ */
+ public CSVRecordReader(final int bufferSize, final String inputFileEncoding, final char openQuote,
+ final char closeQuote, final char escape) {
+ Preconditions.checkNotNull(openQuote, "quote cannot be null.");
+ Preconditions.checkNotNull(closeQuote, "quote cannot be null.");
+ Preconditions.checkNotNull(escape, "escape cannot be null.");
+
+ this.fileStreamBufferSize = bufferSize;
+ this.inputFileEncoding = inputFileEncoding;
+ this.openQuote = openQuote;
+ this.closeQuote = closeQuote;
+ this.escape = escape;
+ }
+
+ /**
+ * Initializes the record reader
+ *
+ * @param genericSplit
+ * the split assigned to this record reader
+ * @param context
+ * the job context for this record reader
+ * @throws IOException
+ * if an IOException occurs while handling the file to be read
+ */
+ @Override
+ public void initialize(final InputSplit genericSplit, final TaskAttemptContext context) throws IOException {
+ final FileSplit split = (FileSplit) genericSplit;
+ final Configuration job = context.getConfiguration();
+
+ start = split.getStart();
+ end = start + split.getLength();
+ this.pos = start;
+
+ final Path file = split.getPath();
+ LOGGER.info("Initializing processing of split for file: " + file);
+ LOGGER.info("File size is: " + file.getFileSystem(job).getFileStatus(file).getLen());
+ LOGGER.info("Split starts at: " + start);
+ LOGGER.info("Split will end at: " + end);
+
+ // Open the file, seek to the start of the split, then wrap it in a
+ // CSVLineReader
+ fileIn = file.getFileSystem(job).open(file);
+ fileIn.seek(start);
+ csvLineReader = new CSVLineReader(fileIn, this.fileStreamBufferSize, inputFileEncoding, this.openQuote,
+ this.closeQuote, this.escape);
+ }
+
+ /**
+ * Increments the key and value pair for this reader
+ *
+ * @return true if there is another key/value to be read, false if not.
+ * @throws IOException
+ * if an IOException occurs while handling the file to be read
+ */
+ @Override
+ public boolean nextKeyValue() throws IOException {
+ if (key == null) {
+ key = new LongWritable();
+ }
+ key.set(pos);
+ if (value == null) {
+ value = new Text();
+ }
+
+ if (pos >= end) {
+ key = null;
+ value = null;
+ LOGGER.info("End of split reached, ending processing. Total records read for this split: " + totalRecordsRead);
+ close();
+ return false;
+ }
+
+ final int newSize = csvLineReader.readCSVLine(value);
+
+ if (newSize == 0) {
+ LOGGER.info("End of file reached. Ending processing. Total records read for this split: " + totalRecordsRead);
+ return false;
+ }
+
+ pos += newSize;
+ totalRecordsRead++;
+ return true;
+ }
+
+ /**
+ * Returns the current key
+ *
+ * @return the key corresponding to the current value
+ */
+ @Override
+ public LongWritable getCurrentKey() {
+ return key;
+ }
+
+ /**
+ * Returns the current value
+ *
+ * @return the value corresponding to the current key
+ */
+ @Override
+ public Text getCurrentValue() {
+ return value;
+ }
+
+ /**
+ * Get the progress within the split
+ */
+ @Override
+ public float getProgress() {
+ if (start == end) {
+ return 0.0f;
+ }
+ return Math.min(1.0f, (pos - start) / (float) (end - start));
+ }
+
+ /**
+ * Closes the file input stream for this record reader
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ fileIn.close();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java b/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java
new file mode 100644
index 0000000..55f2f1f
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVLineReaderTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text.csv;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.hadoop.io.Text;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class CSVLineReaderTest {
+ @Rule
+ public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+ @Test
+ public void testVariousUTF8Characters() throws IOException {
+ final String variousCharacters = "€Abبώиב¥£€¢₡₢₣₤₥₦§₧₨₩₪₫₭₮漢Ä©óíßä";
+ String utf8Junk = tmpDir.copyResourceFileName("UTF8.csv");
+ FileInputStream fileInputStream = null;
+ try {
+
+ fileInputStream = new FileInputStream(new File(utf8Junk));
+ final CSVLineReader csvLineReader = new CSVLineReader(fileInputStream);
+ final Text readText = new Text();
+ csvLineReader.readCSVLine(readText);
+ assertEquals(variousCharacters, readText.toString());
+ } finally {
+ fileInputStream.close();
+ }
+ }
+
+ /**
+ * This is effectively a mirror the address tests, but using Chinese
+ * characters, even for the quotation marks and escape characters.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testBrokenLineParsingInChinese() throws IOException {
+ final String[] expectedChineseLines = { "您好我叫马克,我从亚拉巴马州来,我是软件工程师,我二十八岁", "我有一个宠物,它是一个小猫,它六岁,它很漂亮",
+ "我喜欢吃饭,“我觉得这个饭最好\n*蛋糕\n*包子\n*冰淇淋\n*啤酒“,他们都很好,我也很喜欢奶酪但它是不健康的", "我是男的,我的头发很短,我穿蓝色的裤子,“我穿黑色的、“衣服”" };
+ String chineseLines = tmpDir.copyResourceFileName("brokenChineseLines.csv");
+ FileInputStream fileInputStream = null;
+ try {
+ fileInputStream = new FileInputStream(new File(chineseLines));
+ final CSVLineReader csvLineReader = new CSVLineReader(fileInputStream, CSVLineReader.DEFAULT_BUFFER_SIZE,
+ CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '“', '”', '、');
+ for (int i = 0; i < expectedChineseLines.length; i++) {
+ final Text readText = new Text();
+ csvLineReader.readCSVLine(readText);
+ assertEquals(expectedChineseLines[i], readText.toString());
+ }
+ } finally {
+ fileInputStream.close();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVRecordIteratorTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVRecordIteratorTest.java b/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVRecordIteratorTest.java
new file mode 100644
index 0000000..306a617
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/io/text/csv/CSVRecordIteratorTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text.csv;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class CSVRecordIteratorTest {
+ @Rule
+ public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+ @Test
+ public void testVanillaCSV() throws IOException {
+ String[] expectedFileContents = { "1,2,3,4", "5,6,7,8", "9,10,11", "12,13,14" };
+
+ String vanillaCSVFile = tmpDir.copyResourceFileName("vanilla.csv");
+ File vanillaFile = new File(vanillaCSVFile);
+ InputStream inputStream = new FileInputStream(vanillaFile);
+ CSVRecordIterator csvRecordIterator = new CSVRecordIterator(inputStream);
+
+ ArrayList<String> fileContents = new ArrayList<String>(5);
+ while (csvRecordIterator.hasNext()) {
+ fileContents.add(csvRecordIterator.next());
+ }
+
+ for (int i = 0; i < expectedFileContents.length; i++) {
+ assertEquals(expectedFileContents[i], fileContents.get(i));
+ }
+ }
+
+ @Test
+ public void testCSVWithNewlines() throws IOException {
+ String[] expectedFileContents = {
+ "\"Champion, Mac\",\"1234 Hoth St.\n\tApartment 101\n\tAtlanta, GA\n\t64086\",\"30\",\"M\",\"5/28/2010 12:00:00 AM\",\"Just some guy\"",
+ "\"Champion, Mac\",\"5678 Tatooine Rd. Apt 5, Mobile, AL 36608\",\"30\",\"M\",\"Some other date\",\"short description\"" };
+ String csvWithNewlines = tmpDir.copyResourceFileName("withNewlines.csv");
+ File fileWithNewlines = new File(csvWithNewlines);
+ InputStream inputStream = new FileInputStream(fileWithNewlines);
+ CSVRecordIterator csvRecordIterator = new CSVRecordIterator(inputStream);
+
+ ArrayList<String> fileContents = new ArrayList<String>(2);
+ while (csvRecordIterator.hasNext()) {
+ fileContents.add(csvRecordIterator.next());
+ }
+
+ for (int i = 0; i < expectedFileContents.length; i++) {
+ assertEquals(expectedFileContents[i], fileContents.get(i));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-test/src/main/resources/UTF8.csv
----------------------------------------------------------------------
diff --git a/crunch-test/src/main/resources/UTF8.csv b/crunch-test/src/main/resources/UTF8.csv
new file mode 100644
index 0000000..6f0b647
--- /dev/null
+++ b/crunch-test/src/main/resources/UTF8.csv
@@ -0,0 +1 @@
+€Abبώиב¥£€¢₡₢₣₤₥₦§₧₨₩₪₫₭₮漢Ä©óíßä
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-test/src/main/resources/brokenChineseLines.csv
----------------------------------------------------------------------
diff --git a/crunch-test/src/main/resources/brokenChineseLines.csv b/crunch-test/src/main/resources/brokenChineseLines.csv
new file mode 100644
index 0000000..a082afc
--- /dev/null
+++ b/crunch-test/src/main/resources/brokenChineseLines.csv
@@ -0,0 +1,8 @@
+您好我叫马克,我从亚拉巴马州来,我是软件工程师,我二十八岁
+我有一个宠物,它是一个小猫,它六岁,它很漂亮
+我喜欢吃饭,“我觉得这个饭最好
+*蛋糕
+*包子
+*冰淇淋
+*啤酒“,他们都很好,我也很喜欢奶酪但它是不健康的
+我是男的,我的头发很短,我穿蓝色的裤子,“我穿黑色的、“衣服”
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-test/src/main/resources/customQuoteCharWithNewlines.csv
----------------------------------------------------------------------
diff --git a/crunch-test/src/main/resources/customQuoteCharWithNewlines.csv b/crunch-test/src/main/resources/customQuoteCharWithNewlines.csv
new file mode 100644
index 0000000..b3644c4
--- /dev/null
+++ b/crunch-test/src/main/resources/customQuoteCharWithNewlines.csv
@@ -0,0 +1,5 @@
+*Champion, Mac*,*1234 Hoth St.
+ Apartment 101
+ Atlanta, GA
+ 64086*,*30*,*M*,*5/28/2010 12:00:00 AM*,*Just some guy*
+*Mac, Champion*,*5678 Tatooine Rd. Apt 5, Mobile, AL 36608*,*30*,*M*,*Some other date*,*short description*
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-test/src/main/resources/vanilla.csv
----------------------------------------------------------------------
diff --git a/crunch-test/src/main/resources/vanilla.csv b/crunch-test/src/main/resources/vanilla.csv
new file mode 100644
index 0000000..09a3185
--- /dev/null
+++ b/crunch-test/src/main/resources/vanilla.csv
@@ -0,0 +1,4 @@
+1,2,3,4
+5,6,7,8
+9,10,11
+12,13,14
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/crunch-test/src/main/resources/withNewlines.csv
----------------------------------------------------------------------
diff --git a/crunch-test/src/main/resources/withNewlines.csv b/crunch-test/src/main/resources/withNewlines.csv
new file mode 100644
index 0000000..be7903c
--- /dev/null
+++ b/crunch-test/src/main/resources/withNewlines.csv
@@ -0,0 +1,5 @@
+"Champion, Mac","1234 Hoth St.
+ Apartment 101
+ Atlanta, GA
+ 64086","30","M","5/28/2010 12:00:00 AM","Just some guy"
+"Champion, Mac","5678 Tatooine Rd. Apt 5, Mobile, AL 36608","30","M","Some other date","short description"
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/ce79a7a1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9e29799..813c82d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -604,6 +604,7 @@ under the License.
<exclude>**/resources/*.txt</exclude>
<exclude>**/resources/**/*.txt</exclude>
<exclude>**/resources/*.avro</exclude>
+ <exclude>**/resources/*.csv</exclude>
<exclude>**/goal.txt</exclude>
<exclude>**/target/generated-test-sources/**</exclude>
<exclude>**/scripts/scrunch</exclude>