You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/11/11 12:05:22 UTC
incubator-flink git commit: [FLINK-1223] Allow value escaping in CSV
files
Repository: incubator-flink
Updated Branches:
refs/heads/master 2f1176a78 -> e855ef471
[FLINK-1223] Allow value escaping in CSV files
- Strings can now contain " quoted characters
- Skip trailing whitespace after quote
This closes #187.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e855ef47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e855ef47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e855ef47
Branch: refs/heads/master
Commit: e855ef4712c6d065d0580b2f58e9485b0909dbf8
Parents: 2f1176a
Author: Johannes <jk...@gmail.com>
Authored: Thu Nov 6 20:19:56 2014 +0100
Committer: uce <uc...@apache.org>
Committed: Tue Nov 11 12:04:27 2014 +0100
----------------------------------------------------------------------
.../api/common/io/GenericCsvInputFormat.java | 16 +--
.../apache/flink/types/parser/StringParser.java | 113 +++++++++++++------
.../api/java/record/io/CsvInputFormat.java | 7 +-
.../flink/api/java/io/CsvInputFormatTest.java | 109 +++++++++++++++---
4 files changed, 181 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e855ef47/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 30a7358..a6ee1da 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -19,16 +19,15 @@
package org.apache.flink.api.common.io;
-import java.io.IOException;
-import java.util.ArrayList;
-
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.util.InstantiationUtil;
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
+import java.io.IOException;
+import java.util.ArrayList;
public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> {
@@ -299,9 +298,10 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
return false;
} else {
String lineAsString = new String(bytes, offset, numBytes);
- throw new ParseException("Line could not be parsed: '" + lineAsString+"'\n"
- + "Expect field types: "+fieldTypesToString()+" \n"
- + "in file: "+filePath);
+ throw new ParseException("Line could not be parsed: '" + lineAsString + "'\n"
+ + "ParserError " + parser.getErrorState() + " \n"
+ + "Expect field types: "+fieldTypesToString() + " \n"
+ + "in file: " + filePath);
}
}
output++;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e855ef47/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
index a465c1f..a39dbe0 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
@@ -30,7 +30,11 @@ public class StringParser extends FieldParser<String> {
private static final byte WHITESPACE_TAB = (byte) '\t';
private static final byte QUOTE_DOUBLE = (byte) '"';
-
+
+ private static enum ParserStates {
+ NONE, IN_QUOTE, STOP
+ }
+
private String result;
@Override
@@ -45,51 +49,86 @@ public class StringParser extends FieldParser<String> {
while (i < limit && ((current = bytes[i]) == WHITESPACE_SPACE || current == WHITESPACE_TAB)) {
i++;
}
-
+
+ // first determine the boundaries of the cell
+ ParserStates parserState = ParserStates.NONE;
+
+ // the current position evaluated against the cell boundary
+ int endOfCellPosition = i - 1;
+
+ while (parserState != ParserStates.STOP && endOfCellPosition < limit) {
+ endOfCellPosition++;
+ // make sure we don't step over the end of the buffer
+ if(endOfCellPosition == limit) {
+ break;
+ }
+ current = bytes[endOfCellPosition];
+ if(current == delByte) {
+ // if we are in a quote do nothing, otherwise we reached the end
+ parserState = parserState == ParserStates.IN_QUOTE ? parserState: ParserStates.STOP;
+ } else if(current == QUOTE_DOUBLE) {
+ // we entered a quote
+ if(parserState == ParserStates.IN_QUOTE) {
+ // we end the quote
+ parserState = ParserStates.NONE;
+ } else {
+ // we start a new quote
+ parserState = ParserStates.IN_QUOTE;
+ }
+ }
+ }
+
+ if(parserState == ParserStates.IN_QUOTE) {
+ // exited due to line end without quote termination
+ setErrorState(ParseErrorState.UNTERMINATED_QUOTED_STRING);
+ return -1;
+ }
+
+
+ // boundary of the cell is now
+ // i --> endOfCellPosition
+
// first none whitespace character
if (i < limit && bytes[i] == QUOTE_DOUBLE) {
- // quoted string
- i++; // the quote
-
- // we count only from after the quote
- int quoteStart = i;
- while (i < limit && bytes[i] != QUOTE_DOUBLE) {
- i++;
+
+ // check if there are characters at the end
+ current = bytes[endOfCellPosition - 1];
+
+ // if the character preceding the end of the cell is not a WHITESPACE or the end QUOTE_DOUBLE
+ // there are unquoted characters at the end
+
+ if (!(current == WHITESPACE_SPACE || current == WHITESPACE_TAB || current == QUOTE_DOUBLE)) {
+ setErrorState(ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING);
+ return -1; // illegal case of non-whitespace characters trailing
}
-
- if (i < limit) {
- // end of the string
- this.result = new String(bytes, quoteStart, i-quoteStart);
-
- i++; // the quote
-
- // skip trailing whitespace characters
- while (i < limit && (current = bytes[i]) != delByte) {
- if (current == WHITESPACE_SPACE || current == WHITESPACE_TAB) {
- i++;
- }
- else {
- setErrorState(ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING);
- return -1; // illegal case of non-whitespace characters trailing
- }
+
+ // skip trailing whitespace after quote .. by moving the cursor backwards
+ int skipAtEnd = 0;
+ while (bytes[endOfCellPosition - 1 - skipAtEnd] == WHITESPACE_SPACE || bytes[endOfCellPosition - 1 - skipAtEnd] == WHITESPACE_TAB) {
+ skipAtEnd++;
+ }
+
+ // now unescape
+ boolean notEscaped = true;
+ int endOfContent = i + 1;
+ for(int counter = endOfContent; counter < endOfCellPosition - skipAtEnd; counter++) {
+ notEscaped = bytes[counter] != QUOTE_DOUBLE || !notEscaped;
+ if (notEscaped) {
+ // realign
+ bytes[endOfContent++] = bytes[counter];
}
-
- return (i == limit ? limit : i+1);
- } else {
- // exited due to line end without quote termination
- setErrorState(ParseErrorState.UNTERMINATED_QUOTED_STRING);
- return -1;
}
+
+ this.result = new String(bytes, i + 1, endOfContent - i - 1);
+
+ return (endOfCellPosition == limit ? limit : endOfCellPosition + 1);
}
else {
// unquoted string
- while (i < limit && bytes[i] != delByte) {
- i++;
- }
-
+
// set from the beginning. unquoted strings include the leading whitespaces
- this.result = new String(bytes, startPos, i-startPos);
- return (i == limit ? limit : i+1);
+ this.result = new String(bytes, i, endOfCellPosition - i);
+ return (endOfCellPosition == limit ? limit : endOfCellPosition + 1);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e855ef47/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
index c7f86af..15333e8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
@@ -19,8 +19,7 @@
package org.apache.flink.api.java.record.io;
-import java.io.IOException;
-
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.io.GenericCsvInputFormat;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.api.common.operators.CompilerHints;
@@ -36,7 +35,7 @@ import org.apache.flink.types.Record;
import org.apache.flink.types.Value;
import org.apache.flink.types.parser.FieldParser;
-import com.google.common.base.Preconditions;
+import java.io.IOException;
/**
* Input format to parse text files and generate Records.
@@ -267,7 +266,7 @@ public class CsvInputFormat extends GenericCsvInputFormat<Record> {
/*
* Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
*/
- //Find windows end line, so find chariage return before the newline
+ //Find windows end line, so find carriage return before the newline
if(this.lineDelimiterIsLinebreak == true && bytes[offset + numBytes -1] == '\r') {
//reduce the number of bytes so that the Carriage return is not taken as data
numBytes--;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e855ef47/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index 5bf35e7..0662aa6 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -19,18 +19,7 @@
package org.apache.flink.api.java.io;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-
+import com.google.common.base.Charsets;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
@@ -38,8 +27,23 @@ import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.StringParser;
import org.junit.Test;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
public class CsvInputFormatTest {
private static final Path PATH = new Path("an/ignored/file/");
@@ -345,12 +349,87 @@ public class CsvInputFormatTest {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
-
+
+ @Test
+ public void testParseStringErrors() throws Exception {
+ StringParser stringParser = new StringParser();
+
+ Object[][] failures = {
+ {"\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING},
+ {"\"unterminated ", FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING}
+ };
+
+ for (Object[] failure : failures) {
+ String input = (String) failure[0];
+
+ int result = stringParser.parseField(input.getBytes(), 0, input.length(), '|', null);
+
+ assertThat(result, is(-1));
+ assertThat(stringParser.getErrorState(), is(failure[1]));
+ }
+
+
+ }
+
+ @Test
+ public void testParserCorrectness() throws Exception {
+ // RFC 4180 Compliance Test content
+ // Taken from http://en.wikipedia.org/wiki/Comma-separated_values#Example
+ final String fileContent =
+ "Year,Make,Model,Description,Price\n" +
+ "1997,Ford,E350,\"ac, abs, moon\",3000.00\n" +
+ "1999,Chevy,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00\n" +
+ "1996,Jeep,Grand Cherokee,\"MUST SELL! air, moon roof, loaded\",4799.00\n" +
+ "1999,Chevy,\"Venture \"\"Extended Edition, Very Large\"\"\",,5000.00\n" +
+ ",,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00";
+
+ final FileInputSplit split = createTempFile(fileContent);
+
+ final CsvInputFormat<Tuple5<Integer, String, String, String, Double>> format =
+ new CsvInputFormat<Tuple5<Integer, String, String, String, Double>>(PATH);
+
+ format.setSkipFirstLineAsHeader(true);
+ format.setFieldDelimiter(',');
+
+ format.setFields(new boolean[] { true, true, true, true, true }, new Class<?>[] {
+ Integer.class, String.class, String.class, String.class, Double.class });
+
+ format.configure(new Configuration());
+ format.open(split);
+
+ Tuple5<Integer, String, String, String, Double> result = new Tuple5<Integer, String, String, String, Double>();
+
+ Tuple5[] expectedLines = new Tuple5[]{
+ new Tuple5<Integer, String, String, String, Double>(1997, "Ford", "E350", "ac, abs, moon", 3000.0),
+ new Tuple5<Integer, String, String, String, Double>(1999, "Chevy", "Venture \"Extended Edition\"", "", 4900.0),
+ new Tuple5<Integer, String, String, String, Double>(1996, "Jeep", "Grand Cherokee", "MUST SELL! air, moon roof, loaded", 4799.00),
+ new Tuple5<Integer, String, String, String, Double>(1999, "Chevy", "Venture \"Extended Edition, Very Large\"", "", 5000.00 ),
+ new Tuple5<Integer, String, String, String, Double>(0, "", "Venture \"Extended Edition\"", "", 4900.0)
+ };
+
+ try {
+
+ for (Tuple5 expected : expectedLines) {
+ result = format.nextRecord(result);
+ assertEquals(expected, result);
+ }
+
+ assertNull(format.nextRecord(result));
+ assertTrue(format.reachedEnd());
+
+ } catch (Exception ex) {
+ fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
+ }
+
+ }
+
private FileInputSplit createTempFile(String content) throws IOException {
File tempFile = File.createTempFile("test_contents", "tmp");
tempFile.deleteOnExit();
-
- FileWriter wrt = new FileWriter(tempFile);
+
+ OutputStreamWriter wrt = new OutputStreamWriter(
+ new FileOutputStream(tempFile), Charsets.UTF_8
+ );
wrt.write(content);
wrt.close();