You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by mk...@apache.org on 2014/06/27 03:25:18 UTC
git commit: CRUNCH-429: Fix CSVInputFormat
Repository: crunch
Updated Branches:
refs/heads/master 8027f706a -> ace392bde
CRUNCH-429: Fix CSVInputFormat
Signed-off-by: Micah Whitacre <mk...@apache.org>
CRUNCH-429: Fixed spelling error, handled potential NPE, and moved FileSystem retrieval outside of for loop.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ace392bd
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ace392bd
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ace392bd
Branch: refs/heads/master
Commit: ace392bde724273b1d300f7b01c4648e9e1b9060
Parents: 8027f70
Author: Mac Champion <ma...@cerner.com>
Authored: Tue Jun 24 12:30:34 2014 -0500
Committer: Micah Whitacre <mk...@apache.org>
Committed: Thu Jun 26 16:12:25 2014 -0500
----------------------------------------------------------------------
.../crunch/io/text/csv/CSVFileSourceIT.java | 66 ++++++++++++--------
.../crunch/io/text/csv/CSVInputFormat.java | 47 ++++++++------
2 files changed, 66 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/ace392bd/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java b/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
index a81f78d..ba8e193 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/text/csv/CSVFileSourceIT.java
@@ -17,19 +17,18 @@
*/
package org.apache.crunch.io.text.csv;
-import java.io.File;
-import java.io.FileInputStream;
+import static org.junit.Assert.assertTrue;
+
import java.io.IOException;
import java.util.Collection;
-import static org.junit.Assert.*;
import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
import org.junit.Rule;
import org.junit.Test;
@@ -39,14 +38,30 @@ public class CSVFileSourceIT {
@Test
public void testVanillaCSV() throws Exception {
- String[] expectedFileContents = { "1,2,3,4", "5,6,7,8", "9,10,11", "12,13,14" };
+ final String[] expectedFileContents = { "1,2,3,4", "5,6,7,8", "9,10,11", "12,13,14" };
+
+ final String vanillaCSVFile = tmpDir.copyResourceFileName("vanilla.csv");
+ final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
+ final PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(vanillaCSVFile)));
+
+ final Collection<String> csvLinesList = csvLines.asCollection().getValue();
+
+ for (int i = 0; i < expectedFileContents.length; i++) {
+ assertTrue(csvLinesList.contains(expectedFileContents[i]));
+ }
+ }
+
+ @Test
+ public void testVanillaCSVWithAdditionalActions() throws Exception {
+ final String[] expectedFileContents = { "1,2,3,4", "5,6,7,8", "9,10,11", "12,13,14" };
- String vanillaCSVFile = tmpDir.copyResourceFileName("vanilla.csv");
- Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
- PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(vanillaCSVFile)));
- pipeline.run();
+ final String vanillaCSVFile = tmpDir.copyResourceFileName("vanilla.csv");
+ final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
+ final PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(vanillaCSVFile)));
- Collection<String> csvLinesList = csvLines.asCollection().getValue();
+ final PTable<String, Long> countTable = csvLines.count();
+ final PCollection<String> csvLines2 = countTable.keys();
+ final Collection<String> csvLinesList = csvLines2.asCollection().getValue();
for (int i = 0; i < expectedFileContents.length; i++) {
assertTrue(csvLinesList.contains(expectedFileContents[i]));
@@ -55,16 +70,15 @@ public class CSVFileSourceIT {
@Test
public void testCSVWithNewlines() throws Exception {
- String[] expectedFileContents = {
+ final String[] expectedFileContents = {
"\"Champion, Mac\",\"1234 Hoth St.\n\tApartment 101\n\tAtlanta, GA\n\t64086\",\"30\",\"M\",\"5/28/2010 12:00:00 AM\",\"Just some guy\"",
"\"Champion, Mac\",\"5678 Tatooine Rd. Apt 5, Mobile, AL 36608\",\"30\",\"M\",\"Some other date\",\"short description\"" };
- String csvWithNewlines = tmpDir.copyResourceFileName("withNewlines.csv");
- Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
- PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(csvWithNewlines)));
- pipeline.run();
+ final String csvWithNewlines = tmpDir.copyResourceFileName("withNewlines.csv");
+ final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
+ final PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(csvWithNewlines)));
- Collection<String> csvLinesList = csvLines.asCollection().getValue();
+ final Collection<String> csvLinesList = csvLines.asCollection().getValue();
for (int i = 0; i < expectedFileContents.length; i++) {
assertTrue(csvLinesList.contains(expectedFileContents[i]));
@@ -77,18 +91,17 @@ public class CSVFileSourceIT {
*/
@Test
public void testCSVWithCustomQuoteAndNewlines() throws IOException {
- String[] expectedFileContents = {
+ final String[] expectedFileContents = {
"*Champion, Mac*,*1234 Hoth St.\n\tApartment 101\n\tAtlanta, GA\n\t64086*,*30*,*M*,*5/28/2010 12:00:00 AM*,*Just some guy*",
"*Mac, Champion*,*5678 Tatooine Rd. Apt 5, Mobile, AL 36608*,*30*,*M*,*Some other date*,*short description*" };
- String csvWithNewlines = tmpDir.copyResourceFileName("customQuoteCharWithNewlines.csv");
- Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
- PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(csvWithNewlines),
+ final String csvWithNewlines = tmpDir.copyResourceFileName("customQuoteCharWithNewlines.csv");
+ final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
+ final PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(csvWithNewlines),
CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '*', '*',
CSVLineReader.DEFAULT_ESCAPE_CHARACTER));
- pipeline.run();
- Collection<String> csvLinesList = csvLines.asCollection().getValue();
+ final Collection<String> csvLinesList = csvLines.asCollection().getValue();
for (int i = 0; i < expectedFileContents.length; i++) {
assertTrue(csvLinesList.contains(expectedFileContents[i]));
@@ -105,13 +118,12 @@ public class CSVFileSourceIT {
public void testBrokenLineParsingInChinese() throws IOException {
final String[] expectedChineseLines = { "您好我叫马克,我从亚拉巴马州来,我是软件工程师,我二十八岁", "我有一个宠物,它是一个小猫,它六岁,它很漂亮",
"我喜欢吃饭,“我觉得这个饭最好\n*蛋糕\n*包子\n*冰淇淋\n*啤酒“,他们都很好,我也很喜欢奶酪但它是不健康的", "我是男的,我的头发很短,我穿蓝色的裤子,“我穿黑色的、“衣服”" };
- String chineseLines = tmpDir.copyResourceFileName("brokenChineseLines.csv");
+ final String chineseLines = tmpDir.copyResourceFileName("brokenChineseLines.csv");
- Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
- PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(chineseLines),
+ final Pipeline pipeline = new MRPipeline(CSVFileSourceIT.class, tmpDir.getDefaultConfiguration());
+ final PCollection<String> csvLines = pipeline.read(new CSVFileSource(new Path(chineseLines),
CSVLineReader.DEFAULT_BUFFER_SIZE, CSVLineReader.DEFAULT_INPUT_FILE_ENCODING, '“', '”', '、'));
- pipeline.run();
- Collection<String> csvLinesList = csvLines.asCollection().getValue();
+ final Collection<String> csvLinesList = csvLines.asCollection().getValue();
for (int i = 0; i < expectedChineseLines.length; i++) {
assertTrue(csvLinesList.contains(expectedChineseLines[i]));
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/ace392bd/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
index 5d5abe3..867b704 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/csv/CSVInputFormat.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -28,7 +30,6 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -43,12 +44,13 @@ import com.google.common.annotations.VisibleForTesting;
* format deals with the fact that CSV files can potentially have multiple lines
* within fields which should all be treated as one record.
*/
-public class CSVInputFormat extends FileInputFormat<LongWritable, Text> {
+public class CSVInputFormat extends FileInputFormat<LongWritable, Text> implements Configurable {
private int bufferSize;
private String inputFileEncoding;
private char openQuoteChar;
private char closeQuoteChar;
private char escapeChar;
+ private Configuration configuration;
/**
* This method is used by crunch to get an instance of {@link CSVRecordReader}
@@ -57,10 +59,7 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> {
* the {@link InputSplit} that will be assigned to the record reader
* @param context
* the {@TaskAttemptContext} for the job
- * @return an instance of {@link CSVRecordReader} created using
- * {@link CSVInputFormat#getSeparatorChar()},
- * {@link CSVInputFormat#getQuoteChar()}, and
- * {@link CSVInputFormat#getEscapeChar()}.
+ * @return an instance of {@link CSVRecordReader} created using configured separator, quote, and escape characters.
*/
@Override
public RecordReader<LongWritable, Text> createRecordReader(final InputSplit split, final TaskAttemptContext context) {
@@ -83,17 +82,18 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> {
final long splitSize = job.getConfiguration().getLong("csv.input.split.size", 67108864);
final List<InputSplit> splits = new ArrayList<InputSplit>();
final Path[] paths = FileUtil.stat2Paths(listStatus(job).toArray(new FileStatus[0]));
- FileSystem fileSystem = null;
+ FileSystem fileSystem = FileSystem.get(job.getConfiguration());
FSDataInputStream inputStream = null;
try {
for (final Path path : paths) {
- fileSystem = path.getFileSystem(job.getConfiguration());
inputStream = fileSystem.open(path);
splits.addAll(getSplitsForFile(splitSize, fileSystem.getFileStatus(path).getLen(), path, inputStream));
}
return splits;
} finally {
- inputStream.close();
+ if(inputStream != null) {
+ inputStream.close();
+ }
}
}
@@ -166,44 +166,51 @@ public class CSVInputFormat extends FileInputFormat<LongWritable, Text> {
return splitsList;
}
+ @Override
+ public Configuration getConf() {
+ return configuration;
+ }
+
+ @Override
+ public void setConf(final Configuration conf) {
+ configuration = conf;
+ configure();
+ }
+
/**
- * This method will read the configuration that is set in
+ * This method will read the configuration options that were set in
* {@link CSVFileSource}'s private getBundle() method
- *
- * @param jobConf
- * The {@code JobConf} instance from which the CSV configuration
- * parameters will be read, if necessary.
*/
- public void configure(JobConf jobConf) {
- String bufferValue = jobConf.get(CSVFileSource.CSV_BUFFER_SIZE);
+ public void configure() {
+ final String bufferValue = this.configuration.get(CSVFileSource.CSV_BUFFER_SIZE);
if ("".equals(bufferValue)) {
bufferSize = CSVLineReader.DEFAULT_BUFFER_SIZE;
} else {
bufferSize = Integer.parseInt(bufferValue);
}
- String inputFileEncodingValue = jobConf.get(CSVFileSource.CSV_INPUT_FILE_ENCODING);
+ final String inputFileEncodingValue = this.configuration.get(CSVFileSource.CSV_INPUT_FILE_ENCODING);
if ("".equals(inputFileEncodingValue)) {
inputFileEncoding = CSVLineReader.DEFAULT_INPUT_FILE_ENCODING;
} else {
inputFileEncoding = inputFileEncodingValue;
}
- String openQuoteCharValue = jobConf.get(CSVFileSource.CSV_OPEN_QUOTE_CHAR);
+ final String openQuoteCharValue = this.configuration.get(CSVFileSource.CSV_OPEN_QUOTE_CHAR);
if ("".equals(openQuoteCharValue)) {
openQuoteChar = CSVLineReader.DEFAULT_QUOTE_CHARACTER;
} else {
openQuoteChar = openQuoteCharValue.charAt(0);
}
- String closeQuoteCharValue = jobConf.get(CSVFileSource.CSV_CLOSE_QUOTE_CHAR);
+ final String closeQuoteCharValue = this.configuration.get(CSVFileSource.CSV_CLOSE_QUOTE_CHAR);
if ("".equals(closeQuoteCharValue)) {
closeQuoteChar = CSVLineReader.DEFAULT_QUOTE_CHARACTER;
} else {
closeQuoteChar = closeQuoteCharValue.charAt(0);
}
- String escapeCharValue = jobConf.get(CSVFileSource.CSV_ESCAPE_CHAR);
+ final String escapeCharValue = this.configuration.get(CSVFileSource.CSV_ESCAPE_CHAR);
if ("".equals(escapeCharValue)) {
escapeChar = CSVLineReader.DEFAULT_ESCAPE_CHARACTER;
} else {