You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/02/27 23:50:41 UTC

flink git commit: [FLINK-5907] [java] Fix handling of trailing empty fields in CsvInputFormat.

Repository: flink
Updated Branches:
  refs/heads/release-1.2 19fe04758 -> 5168b9f62


[FLINK-5907] [java] Fix handling of trailing empty fields in CsvInputFormat.

This closes #3417.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5168b9f6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5168b9f6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5168b9f6

Branch: refs/heads/release-1.2
Commit: 5168b9f62a05176aca5bd3c094241daaa4d14b2e
Parents: 19fe047
Author: Kurt Young <yk...@gmail.com>
Authored: Sat Feb 25 16:37:37 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Feb 28 00:49:35 2017 +0100

----------------------------------------------------------------------
 .../api/common/io/GenericCsvInputFormat.java    | 30 +++++++-
 .../apache/flink/types/parser/FieldParser.java  | 21 +++++
 .../common/io/GenericCsvInputFormatTest.java    |  4 +-
 .../flink/types/parser/FieldParserTest.java     | 46 +++++++++++
 .../flink/api/java/io/RowCsvInputFormat.java    | 13 +++-
 .../flink/api/java/io/CsvInputFormatTest.java   | 81 +++++++++++++++++++-
 .../api/java/io/RowCsvInputFormatTest.java      | 75 ++++++++++++++++--
 7 files changed, 258 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 20c643e..b934d41 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -358,14 +358,14 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 		for (int field = 0, output = 0; field < fieldIncluded.length; field++) {
 			
 			// check valid start position
-			if (startPos >= limit) {
+			if (startPos > limit || (startPos == limit && field != fieldIncluded.length - 1)) {
 				if (lenient) {
 					return false;
 				} else {
 					throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
 				}
 			}
-			
+
 			if (fieldIncluded[field]) {
 				// parse field
 				@SuppressWarnings("unchecked")
@@ -373,7 +373,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 				Object reuse = holders[output];
 				startPos = parser.resetErrorStateAndParse(bytes, startPos, limit, this.fieldDelim, reuse);
 				holders[output] = parser.getLastResult();
-				
+
 				// check parse result
 				if (startPos < 0) {
 					// no good
@@ -387,6 +387,17 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 								+ "in file: " + filePath);
 					}
 				}
+				else if (startPos == limit
+						&& field != fieldIncluded.length - 1
+						&& !FieldParser.endsWithDelimiter(bytes, startPos - 1, fieldDelim)) {
+					// We are at the end of the record, but not all fields have been read
+					// and the end is not a field delimiter indicating an empty last field.
+					if (lenient) {
+						return false;
+					} else {
+						throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
+					}
+				}
 				output++;
 			}
 			else {
@@ -398,6 +409,19 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 						throw new ParseException("Line could not be parsed: '" + lineAsString+"'\n"
 								+ "Expect field types: "+fieldTypesToString()+" \n"
 								+ "in file: "+filePath);
+					} else {
+						return false;
+					}
+				}
+				else if (startPos == limit
+						&& field != fieldIncluded.length - 1
+						&& !FieldParser.endsWithDelimiter(bytes, startPos - 1, fieldDelim)) {
+					// We are at the end of the record, but not all fields have been read
+					// and the end is not a field delimiter indicating an empty last field.
+					if (lenient) {
+						return false;
+					} else {
+						throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index cf3c83d..c45f820 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -156,6 +156,27 @@ public abstract class FieldParser<T> {
 		return true;
 		
 	}
+
+	/**
+	 * Checks if the given bytes ends with the delimiter at the given end position.
+	 *
+	 * @param bytes  The byte array that holds the value.
+	 * @param endPos The index of the byte array where the check for the delimiter ends.
+	 * @param delim  The delimiter to check for.
+	 *
+	 * @return true if a delimiter ends at the given end position, false otherwise.
+	 */
+	public static final boolean endsWithDelimiter(byte[] bytes, int endPos, byte[] delim) {
+		if (endPos < delim.length - 1) {
+			return false;
+		}
+		for (int pos = 0; pos < delim.length; ++pos) {
+			if (delim[pos] != bytes[endPos - delim.length + 1 + pos]) {
+				return false;
+			}
+		}
+		return true;
+	}
 	
 	/**
 	 * Sets the error state of the parser. Called by subclasses of the parser to set the type of error

http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
index c11a573..4873fa8 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
@@ -522,14 +522,14 @@ public class GenericCsvInputFormatTest {
 									"kkz|777|foobar|hhg\n" +  // wrong data type in field
 									"kkz|777foobarhhg  \n" +  // too short, a skipped field never ends
 									"xyx|ignored|42|\n";      // another good line
-			final FileInputSplit split = createTempFile(fileContent);	
+			final FileInputSplit split = createTempFile(fileContent);
 		
 			final Configuration parameters = new Configuration();
 
 			format.setFieldDelimiter("|");
 			format.setFieldTypesGeneric(StringValue.class, null, IntValue.class);
 			format.setLenient(true);
-			
+
 			format.configure(parameters);
 			format.open(split);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java
new file mode 100644
index 0000000..bcb2bfb
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.parser;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class FieldParserTest {
+
+	@Test
+	public void testDelimiterNext() throws Exception {
+		byte[] bytes = "aaabc".getBytes();
+		byte[] delim = "aa".getBytes();
+		assertTrue(FieldParser.delimiterNext(bytes, 0, delim));
+		assertTrue(FieldParser.delimiterNext(bytes, 1, delim));
+		assertFalse(FieldParser.delimiterNext(bytes, 2, delim));
+	}
+
+	@Test
+	public void testEndsWithDelimiter() throws Exception {
+		byte[] bytes = "aabc".getBytes();
+		byte[] delim = "ab".getBytes();
+		assertFalse(FieldParser.endsWithDelimiter(bytes, 0, delim));
+		assertFalse(FieldParser.endsWithDelimiter(bytes, 1, delim));
+		assertTrue(FieldParser.endsWithDelimiter(bytes, 2, delim));
+		assertFalse(FieldParser.endsWithDelimiter(bytes, 3, delim));
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
index af2e9e4..ce37c74 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
@@ -151,7 +151,7 @@ public class RowCsvInputFormat extends CsvInputFormat<Row> implements ResultType
 		while (field < fieldIncluded.length) {
 
 			// check valid start position
-			if (startPos >= limit) {
+			if (startPos > limit || (startPos == limit && field != fieldIncluded.length - 1)) {
 				if (isLenient()) {
 					return false;
 				} else {
@@ -198,6 +198,17 @@ public class RowCsvInputFormat extends CsvInputFormat<Row> implements ResultType
 				throw new ParseException(String.format("Unexpected parser position for column %1$s of row '%2$s'",
 					field, new String(bytes, offset, numBytes)));
 			}
+			else if (startPos == limit
+					&& field != fieldIncluded.length - 1
+					&& !FieldParser.endsWithDelimiter(bytes, startPos - 1, fieldDelimiter)) {
+				// We are at the end of the record, but not all fields have been read
+				// and the end is not a field delimiter indicating an empty last field.
+				if (isLenient()) {
+					return false;
+				} else {
+					throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
+				}
+			}
 
 			field++;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index cc0d5bc..a303ff7 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -430,7 +430,7 @@ public class CsvInputFormatTest {
 			assertEquals("", result.f0);
 			assertEquals("", result.f1);
 			assertEquals("", result.f2);
-			
+
 			result = format.nextRecord(result);
 			assertNull(result);
 			assertTrue(format.reachedEnd());
@@ -441,6 +441,57 @@ public class CsvInputFormatTest {
 	}
 
 	@Test
+	public void testTailingEmptyFields() throws Exception {
+		final String fileContent = "aa,bb,cc\n" + // ok
+				"aa,bb,\n" +  // the last field is empty
+				"aa,,\n" +    // the last two fields are empty
+				",,\n" +      // all fields are empty
+				"aa,bb";      // row too short
+		final FileInputSplit split = createTempFile(fileContent);
+
+		final TupleTypeInfo<Tuple3<String, String, String>> typeInfo =
+				TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class);
+		final CsvInputFormat<Tuple3<String, String, String>> format =
+				new TupleCsvInputFormat<Tuple3<String, String, String>>(PATH, typeInfo);
+
+		format.setFieldDelimiter(",");
+
+		format.configure(new Configuration());
+		format.open(split);
+
+		Tuple3<String, String, String> result = new Tuple3<String, String, String>();
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("aa", result.f0);
+		assertEquals("bb", result.f1);
+		assertEquals("cc", result.f2);
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("aa", result.f0);
+		assertEquals("bb", result.f1);
+		assertEquals("", result.f2);
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("aa", result.f0);
+		assertEquals("", result.f1);
+		assertEquals("", result.f2);
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("", result.f0);
+		assertEquals("", result.f1);
+		assertEquals("", result.f2);
+
+		try {
+			format.nextRecord(result);
+			fail("Parse Exception was not thrown! (Row too short)");
+		} catch (ParseException e) {}
+	}
+
+	@Test
 	public void testIntegerFields() throws IOException {
 		try {
 			final String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n";
@@ -957,6 +1008,34 @@ public class CsvInputFormatTest {
 	}
 
 	@Test
+	public void testPojoTypeWithTrailingEmptyFields() throws Exception {
+		final String fileContent = "123,,3.123,,\n456,BBB,3.23,,";
+		final FileInputSplit split = createTempFile(fileContent);
+
+		@SuppressWarnings("unchecked")
+		PojoTypeInfo<PrivatePojoItem> typeInfo = (PojoTypeInfo<PrivatePojoItem>) TypeExtractor.createTypeInfo(PrivatePojoItem.class);
+		CsvInputFormat<PrivatePojoItem> inputFormat = new PojoCsvInputFormat<PrivatePojoItem>(PATH, typeInfo);
+
+		inputFormat.configure(new Configuration());
+		inputFormat.open(split);
+
+		PrivatePojoItem item = new PrivatePojoItem();
+		inputFormat.nextRecord(item);
+
+		assertEquals(123, item.field1);
+		assertEquals("", item.field2);
+		assertEquals(Double.valueOf(3.123), item.field3);
+		assertEquals("", item.field4);
+
+		inputFormat.nextRecord(item);
+
+		assertEquals(456, item.field1);
+		assertEquals("BBB", item.field2);
+		assertEquals(Double.valueOf(3.23), item.field3);
+		assertEquals("", item.field4);
+	}
+
+	@Test
 	public void testPojoTypeWithMappingInformation() throws Exception {
 		File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
 		tempFile.deleteOnExit();

http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
index b819641..f6bda30 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
@@ -230,7 +230,7 @@ public class RowCsvInputFormatTest {
 
 	@Test
 	public void readStringFields() throws Exception {
-		String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
+		String fileContent = "abc|def|ghijk\nabc||hhg\n|||\n||";
 
 		FileInputSplit split = createTempFile(fileContent);
 
@@ -264,13 +264,19 @@ public class RowCsvInputFormatTest {
 		assertEquals("", result.getField(2));
 
 		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("", result.getField(0));
+		assertEquals("", result.getField(1));
+		assertEquals("", result.getField(2));
+
+		result = format.nextRecord(result);
 		assertNull(result);
 		assertTrue(format.reachedEnd());
 	}
 
 	@Test
 	public void readMixedQuotedStringFields() throws Exception {
-		String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||";
+		String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||\n";
 
 		FileInputSplit split = createTempFile(fileContent);
 
@@ -351,6 +357,65 @@ public class RowCsvInputFormatTest {
 	}
 
 	@Test
+	public void testTailingEmptyFields() throws Exception {
+		String fileContent = "abc|-def|-ghijk\n" +
+				"abc|-def|-\n" +
+				"abc|-|-\n" +
+				"|-|-|-\n" +
+				"|-|-\n" +
+				"abc|-def\n";
+
+		FileInputSplit split = createTempFile(fileContent);
+
+		TypeInformation[] fieldTypes = new TypeInformation[]{
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO};
+
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, "\n", "|");
+		format.setFieldDelimiter("|-");
+		format.configure(new Configuration());
+		format.open(split);
+
+		Row result = new Row(3);
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("abc", result.getField(0));
+		assertEquals("def", result.getField(1));
+		assertEquals("ghijk", result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("abc", result.getField(0));
+		assertEquals("def", result.getField(1));
+		assertEquals("", result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("abc", result.getField(0));
+		assertEquals("", result.getField(1));
+		assertEquals("", result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("", result.getField(0));
+		assertEquals("", result.getField(1));
+		assertEquals("", result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("", result.getField(0));
+		assertEquals("", result.getField(1));
+		assertEquals("", result.getField(2));
+
+		try {
+			format.nextRecord(result);
+			fail("Parse Exception was not thrown! (Row too short)");
+		} catch (ParseException e) {}
+	}
+
+	@Test
 	public void testIntegerFields() throws Exception {
 		String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n";
 
@@ -396,12 +461,12 @@ public class RowCsvInputFormatTest {
 	public void testEmptyFields() throws Exception {
 		String fileContent =
 			",,,,,,,,\n" +
+				",,,,,,,\n" +
 				",,,,,,,,\n" +
+				",,,,,,,\n" +
 				",,,,,,,,\n" +
 				",,,,,,,,\n" +
-				",,,,,,,,\n" +
-				",,,,,,,,\n" +
-				",,,,,,,,\n" +
+				",,,,,,,\n" +
 				",,,,,,,,\n";
 
 		FileInputSplit split = createTempFile(fileContent);