You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/11/11 12:05:22 UTC

incubator-flink git commit: [FLINK-1223] Allow value escaping in CSV files

Repository: incubator-flink
Updated Branches:
  refs/heads/master 2f1176a78 -> e855ef471


[FLINK-1223] Allow value escaping in CSV files

- Strings can now contain " quoted characters
- Skip trailing whitespace after quote

This closes #187.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e855ef47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e855ef47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e855ef47

Branch: refs/heads/master
Commit: e855ef4712c6d065d0580b2f58e9485b0909dbf8
Parents: 2f1176a
Author: Johannes <jk...@gmail.com>
Authored: Thu Nov 6 20:19:56 2014 +0100
Committer: uce <uc...@apache.org>
Committed: Tue Nov 11 12:04:27 2014 +0100

----------------------------------------------------------------------
 .../api/common/io/GenericCsvInputFormat.java    |  16 +--
 .../apache/flink/types/parser/StringParser.java | 113 +++++++++++++------
 .../api/java/record/io/CsvInputFormat.java      |   7 +-
 .../flink/api/java/io/CsvInputFormatTest.java   | 109 +++++++++++++++---
 4 files changed, 181 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e855ef47/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 30a7358..a6ee1da 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -19,16 +19,15 @@
 
 package org.apache.flink.api.common.io;
 
-import java.io.IOException;
-import java.util.ArrayList;
-
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.parser.FieldParser;
 import org.apache.flink.util.InstantiationUtil;
 
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
+import java.io.IOException;
+import java.util.ArrayList;
 
 
 public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> {
@@ -299,9 +298,10 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 						return false;
 					} else {
 						String lineAsString = new String(bytes, offset, numBytes);
-						throw new ParseException("Line could not be parsed: '" + lineAsString+"'\n"
-								+ "Expect field types: "+fieldTypesToString()+" \n"
-								+ "in file: "+filePath);
+						throw new ParseException("Line could not be parsed: '" + lineAsString + "'\n"
+								+ "ParserError " + parser.getErrorState() + " \n"
+								+ "Expect field types: "+fieldTypesToString() + " \n"
+								+ "in file: " + filePath);
 					}
 				}
 				output++;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e855ef47/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
index a465c1f..a39dbe0 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
@@ -30,7 +30,11 @@ public class StringParser extends FieldParser<String> {
 	private static final byte WHITESPACE_TAB = (byte) '\t';
 	
 	private static final byte QUOTE_DOUBLE = (byte) '"';
-	
+
+	private static enum ParserStates {
+		NONE, IN_QUOTE, STOP
+	}
+
 	private String result;
 	
 	@Override
@@ -45,51 +49,86 @@ public class StringParser extends FieldParser<String> {
 		while (i < limit && ((current = bytes[i]) == WHITESPACE_SPACE || current == WHITESPACE_TAB)) {
 			i++;
 		}
-		
+
+		// first determine the boundaries of the cell
+		ParserStates parserState = ParserStates.NONE;
+
+		// the current position evaluated against the cell boundary
+		int endOfCellPosition = i - 1;
+
+		while (parserState != ParserStates.STOP && endOfCellPosition < limit) {
+			endOfCellPosition++;
+			// make sure we don't step over the end of the buffer
+			if(endOfCellPosition == limit) {
+				break;
+			}
+			current = bytes[endOfCellPosition];
+			if(current == delByte) {
+				// if we are in a quote do nothing, otherwise we reached the end
+				parserState = parserState == ParserStates.IN_QUOTE ? parserState: ParserStates.STOP;
+			} else if(current == QUOTE_DOUBLE) {
+				// we entered a quote
+				if(parserState == ParserStates.IN_QUOTE) {
+					// we end the quote
+					parserState = ParserStates.NONE;
+				} else {
+					// we start a new quote
+					parserState = ParserStates.IN_QUOTE;
+				}
+			}
+		}
+
+		if(parserState == ParserStates.IN_QUOTE) {
+			// exited due to line end without quote termination
+			setErrorState(ParseErrorState.UNTERMINATED_QUOTED_STRING);
+			return -1;
+		}
+
+
+		// boundary of the cell is now
+		// i --> endOfCellPosition
+
 		// first none whitespace character
 		if (i < limit && bytes[i] == QUOTE_DOUBLE) {
-			// quoted string
-			i++; // the quote
-			
-			// we count only from after the quote
-			int quoteStart = i;
-			while (i < limit && bytes[i] != QUOTE_DOUBLE) {
-				i++;
+
+			// check if there are characters at the end
+			current = bytes[endOfCellPosition - 1];
+
+			// if the character preceding the end of the cell is not a WHITESPACE or the end QUOTE_DOUBLE
+			// there are unquoted characters at the end
+
+			if (!(current == WHITESPACE_SPACE || current == WHITESPACE_TAB || current == QUOTE_DOUBLE)) {
+				setErrorState(ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING);
+				return -1;	// illegal case of non-whitespace characters trailing
 			}
-			
-			if (i < limit) {
-				// end of the string
-				this.result = new String(bytes, quoteStart, i-quoteStart);
-				
-				i++; // the quote
-				
-				// skip trailing whitespace characters 
-				while (i < limit && (current = bytes[i]) != delByte) {
-					if (current == WHITESPACE_SPACE || current == WHITESPACE_TAB) {
-						i++;
-					}
-					else {
-						setErrorState(ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING);
-						return -1;	// illegal case of non-whitespace characters trailing
-					}
+
+			// skip trailing whitespace after quote .. by moving the cursor backwards
+			int skipAtEnd = 0;
+			while (bytes[endOfCellPosition - 1 - skipAtEnd] == WHITESPACE_SPACE || bytes[endOfCellPosition - 1 - skipAtEnd] == WHITESPACE_TAB) {
+				skipAtEnd++;
+			}
+
+			// now unescape
+			boolean notEscaped = true;
+			int endOfContent = i + 1;
+			for(int counter = endOfContent; counter < endOfCellPosition - skipAtEnd; counter++) {
+				notEscaped = bytes[counter] != QUOTE_DOUBLE || !notEscaped;
+				if (notEscaped) {
+					// realign
+					bytes[endOfContent++] = bytes[counter];
 				}
-				
-				return (i == limit ? limit : i+1);
-			} else {
-				// exited due to line end without quote termination
-				setErrorState(ParseErrorState.UNTERMINATED_QUOTED_STRING);
-				return -1;
 			}
+
+			this.result = new String(bytes, i + 1, endOfContent - i - 1);
+
+			return (endOfCellPosition == limit ? limit : endOfCellPosition + 1);
 		}
 		else {
 			// unquoted string
-			while (i < limit && bytes[i] != delByte) {
-				i++;
-			}
-			
+
 			// set from the beginning. unquoted strings include the leading whitespaces
-			this.result = new String(bytes, startPos, i-startPos);
-			return (i == limit ? limit : i+1);
+			this.result = new String(bytes, i, endOfCellPosition - i);
+			return (endOfCellPosition == limit ? limit : endOfCellPosition + 1);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e855ef47/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
index c7f86af..15333e8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
@@ -19,8 +19,7 @@
 
 package org.apache.flink.api.java.record.io;
 
-import java.io.IOException;
-
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.io.GenericCsvInputFormat;
 import org.apache.flink.api.common.io.ParseException;
 import org.apache.flink.api.common.operators.CompilerHints;
@@ -36,7 +35,7 @@ import org.apache.flink.types.Record;
 import org.apache.flink.types.Value;
 import org.apache.flink.types.parser.FieldParser;
 
-import com.google.common.base.Preconditions;
+import java.io.IOException;
 
 /**
  * Input format to parse text files and generate Records. 
@@ -267,7 +266,7 @@ public class CsvInputFormat extends GenericCsvInputFormat<Record> {
 		/*
 		 * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
 		 */
-		//Find windows end line, so find chariage return before the newline 
+		//Find windows end line, so find carriage return before the newline
 		if(this.lineDelimiterIsLinebreak == true && bytes[offset + numBytes -1] == '\r') {
 			//reduce the number of bytes so that the Carriage return is not taken as data
 			numBytes--;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e855ef47/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index 5bf35e7..0662aa6 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -19,18 +19,7 @@
 
 package org.apache.flink.api.java.io;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-
+import com.google.common.base.Charsets;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -38,8 +27,23 @@ import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.StringParser;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class CsvInputFormatTest {
 	
 	private static final Path PATH = new Path("an/ignored/file/");
@@ -345,12 +349,87 @@ public class CsvInputFormatTest {
 			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
 		}
 	}
-	
+
+	@Test
+	public void testParseStringErrors() throws Exception {
+		StringParser stringParser = new StringParser();
+
+		Object[][] failures = {
+				{"\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING},
+				{"\"unterminated ", 		FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING}
+		};
+
+		for (Object[] failure : failures) {
+			String input = (String) failure[0];
+
+			int result = stringParser.parseField(input.getBytes(), 0, input.length(), '|', null);
+
+			assertThat(result, is(-1));
+			assertThat(stringParser.getErrorState(), is(failure[1]));
+		}
+
+
+	}
+
+	@Test
+	public void testParserCorrectness() throws Exception {
+		// RFC 4180 Compliance Test content
+		// Taken from http://en.wikipedia.org/wiki/Comma-separated_values#Example
+		final String fileContent =
+				"Year,Make,Model,Description,Price\n" +
+				"1997,Ford,E350,\"ac, abs, moon\",3000.00\n" +
+				"1999,Chevy,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00\n" +
+				"1996,Jeep,Grand Cherokee,\"MUST SELL! air, moon roof, loaded\",4799.00\n" +
+				"1999,Chevy,\"Venture \"\"Extended Edition, Very Large\"\"\",,5000.00\n" +
+				",,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00";
+
+		final FileInputSplit split = createTempFile(fileContent);
+
+		final CsvInputFormat<Tuple5<Integer, String, String, String, Double>> format =
+				new CsvInputFormat<Tuple5<Integer, String, String, String, Double>>(PATH);
+
+		format.setSkipFirstLineAsHeader(true);
+		format.setFieldDelimiter(',');
+
+		format.setFields(new boolean[] { true, true, true, true, true }, new Class<?>[] {
+				Integer.class, String.class, String.class, String.class, Double.class });
+
+		format.configure(new Configuration());
+		format.open(split);
+
+		Tuple5<Integer, String, String, String, Double> result = new Tuple5<Integer, String, String, String, Double>();
+
+		Tuple5[] expectedLines = new Tuple5[]{
+				new Tuple5<Integer, String, String, String, Double>(1997, "Ford", "E350", "ac, abs, moon", 3000.0),
+				new Tuple5<Integer, String, String, String, Double>(1999, "Chevy", "Venture \"Extended Edition\"", "", 4900.0),
+				new Tuple5<Integer, String, String, String, Double>(1996, "Jeep", "Grand Cherokee", "MUST SELL! air, moon roof, loaded", 4799.00),
+				new Tuple5<Integer, String, String, String, Double>(1999, "Chevy", "Venture \"Extended Edition, Very Large\"", "", 5000.00	),
+				new Tuple5<Integer, String, String, String, Double>(0, "", "Venture \"Extended Edition\"", "", 4900.0)
+		};
+
+		try {
+
+			for (Tuple5 expected : expectedLines) {
+				result = format.nextRecord(result);
+				assertEquals(expected, result);
+			}
+
+			assertNull(format.nextRecord(result));
+			assertTrue(format.reachedEnd());
+
+		} catch (Exception ex) {
+			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
+		}
+
+	}
+
 	private FileInputSplit createTempFile(String content) throws IOException {
 		File tempFile = File.createTempFile("test_contents", "tmp");
 		tempFile.deleteOnExit();
-		
-		FileWriter wrt = new FileWriter(tempFile);
+
+		OutputStreamWriter wrt = new OutputStreamWriter(
+				new FileOutputStream(tempFile), Charsets.UTF_8
+		);
 		wrt.write(content);
 		wrt.close();