You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/02/27 23:50:41 UTC
flink git commit: [FLINK-5907] [java] Fix handling of trailing empty
fields in CsvInputFormat.
Repository: flink
Updated Branches:
refs/heads/release-1.2 19fe04758 -> 5168b9f62
[FLINK-5907] [java] Fix handling of trailing empty fields in CsvInputFormat.
This closes #3417.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5168b9f6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5168b9f6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5168b9f6
Branch: refs/heads/release-1.2
Commit: 5168b9f62a05176aca5bd3c094241daaa4d14b2e
Parents: 19fe047
Author: Kurt Young <yk...@gmail.com>
Authored: Sat Feb 25 16:37:37 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Feb 28 00:49:35 2017 +0100
----------------------------------------------------------------------
.../api/common/io/GenericCsvInputFormat.java | 30 +++++++-
.../apache/flink/types/parser/FieldParser.java | 21 +++++
.../common/io/GenericCsvInputFormatTest.java | 4 +-
.../flink/types/parser/FieldParserTest.java | 46 +++++++++++
.../flink/api/java/io/RowCsvInputFormat.java | 13 +++-
.../flink/api/java/io/CsvInputFormatTest.java | 81 +++++++++++++++++++-
.../api/java/io/RowCsvInputFormatTest.java | 75 ++++++++++++++++--
7 files changed, 258 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 20c643e..b934d41 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -358,14 +358,14 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
for (int field = 0, output = 0; field < fieldIncluded.length; field++) {
// check valid start position
- if (startPos >= limit) {
+ if (startPos > limit || (startPos == limit && field != fieldIncluded.length - 1)) {
if (lenient) {
return false;
} else {
throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
}
}
-
+
if (fieldIncluded[field]) {
// parse field
@SuppressWarnings("unchecked")
@@ -373,7 +373,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
Object reuse = holders[output];
startPos = parser.resetErrorStateAndParse(bytes, startPos, limit, this.fieldDelim, reuse);
holders[output] = parser.getLastResult();
-
+
// check parse result
if (startPos < 0) {
// no good
@@ -387,6 +387,17 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
+ "in file: " + filePath);
}
}
+ else if (startPos == limit
+ && field != fieldIncluded.length - 1
+ && !FieldParser.endsWithDelimiter(bytes, startPos - 1, fieldDelim)) {
+ // We are at the end of the record, but not all fields have been read
+ // and the end is not a field delimiter indicating an empty last field.
+ if (lenient) {
+ return false;
+ } else {
+ throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
+ }
+ }
output++;
}
else {
@@ -398,6 +409,19 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
throw new ParseException("Line could not be parsed: '" + lineAsString+"'\n"
+ "Expect field types: "+fieldTypesToString()+" \n"
+ "in file: "+filePath);
+ } else {
+ return false;
+ }
+ }
+ else if (startPos == limit
+ && field != fieldIncluded.length - 1
+ && !FieldParser.endsWithDelimiter(bytes, startPos - 1, fieldDelim)) {
+ // We are at the end of the record, but not all fields have been read
+ // and the end is not a field delimiter indicating an empty last field.
+ if (lenient) {
+ return false;
+ } else {
+ throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index cf3c83d..c45f820 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -156,6 +156,27 @@ public abstract class FieldParser<T> {
return true;
}
+
+ /**
+ * Checks if the given bytes ends with the delimiter at the given end position.
+ *
+ * @param bytes The byte array that holds the value.
+ * @param endPos The index of the byte array where the check for the delimiter ends.
+ * @param delim The delimiter to check for.
+ *
+ * @return true if a delimiter ends at the given end position, false otherwise.
+ */
+ public static final boolean endsWithDelimiter(byte[] bytes, int endPos, byte[] delim) {
+ if (endPos < delim.length - 1) {
+ return false;
+ }
+ for (int pos = 0; pos < delim.length; ++pos) {
+ if (delim[pos] != bytes[endPos - delim.length + 1 + pos]) {
+ return false;
+ }
+ }
+ return true;
+ }
/**
* Sets the error state of the parser. Called by subclasses of the parser to set the type of error
http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
index c11a573..4873fa8 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
@@ -522,14 +522,14 @@ public class GenericCsvInputFormatTest {
"kkz|777|foobar|hhg\n" + // wrong data type in field
"kkz|777foobarhhg \n" + // too short, a skipped field never ends
"xyx|ignored|42|\n"; // another good line
- final FileInputSplit split = createTempFile(fileContent);
+ final FileInputSplit split = createTempFile(fileContent);
final Configuration parameters = new Configuration();
format.setFieldDelimiter("|");
format.setFieldTypesGeneric(StringValue.class, null, IntValue.class);
format.setLenient(true);
-
+
format.configure(parameters);
format.open(split);
http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java
new file mode 100644
index 0000000..bcb2bfb
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types.parser;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class FieldParserTest {
+
+ @Test
+ public void testDelimiterNext() throws Exception {
+ byte[] bytes = "aaabc".getBytes();
+ byte[] delim = "aa".getBytes();
+ assertTrue(FieldParser.delimiterNext(bytes, 0, delim));
+ assertTrue(FieldParser.delimiterNext(bytes, 1, delim));
+ assertFalse(FieldParser.delimiterNext(bytes, 2, delim));
+ }
+
+ @Test
+ public void testEndsWithDelimiter() throws Exception {
+ byte[] bytes = "aabc".getBytes();
+ byte[] delim = "ab".getBytes();
+ assertFalse(FieldParser.endsWithDelimiter(bytes, 0, delim));
+ assertFalse(FieldParser.endsWithDelimiter(bytes, 1, delim));
+ assertTrue(FieldParser.endsWithDelimiter(bytes, 2, delim));
+ assertFalse(FieldParser.endsWithDelimiter(bytes, 3, delim));
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
index af2e9e4..ce37c74 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
@@ -151,7 +151,7 @@ public class RowCsvInputFormat extends CsvInputFormat<Row> implements ResultType
while (field < fieldIncluded.length) {
// check valid start position
- if (startPos >= limit) {
+ if (startPos > limit || (startPos == limit && field != fieldIncluded.length - 1)) {
if (isLenient()) {
return false;
} else {
@@ -198,6 +198,17 @@ public class RowCsvInputFormat extends CsvInputFormat<Row> implements ResultType
throw new ParseException(String.format("Unexpected parser position for column %1$s of row '%2$s'",
field, new String(bytes, offset, numBytes)));
}
+ else if (startPos == limit
+ && field != fieldIncluded.length - 1
+ && !FieldParser.endsWithDelimiter(bytes, startPos - 1, fieldDelimiter)) {
+ // We are at the end of the record, but not all fields have been read
+ // and the end is not a field delimiter indicating an empty last field.
+ if (isLenient()) {
+ return false;
+ } else {
+ throw new ParseException("Row too short: " + new String(bytes, offset, numBytes));
+ }
+ }
field++;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index cc0d5bc..a303ff7 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -430,7 +430,7 @@ public class CsvInputFormatTest {
assertEquals("", result.f0);
assertEquals("", result.f1);
assertEquals("", result.f2);
-
+
result = format.nextRecord(result);
assertNull(result);
assertTrue(format.reachedEnd());
@@ -441,6 +441,57 @@ public class CsvInputFormatTest {
}
@Test
+ public void testTailingEmptyFields() throws Exception {
+ final String fileContent = "aa,bb,cc\n" + // ok
+ "aa,bb,\n" + // the last field is empty
+ "aa,,\n" + // the last two fields are empty
+ ",,\n" + // all fields are empty
+ "aa,bb"; // row too short
+ final FileInputSplit split = createTempFile(fileContent);
+
+ final TupleTypeInfo<Tuple3<String, String, String>> typeInfo =
+ TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class);
+ final CsvInputFormat<Tuple3<String, String, String>> format =
+ new TupleCsvInputFormat<Tuple3<String, String, String>>(PATH, typeInfo);
+
+ format.setFieldDelimiter(",");
+
+ format.configure(new Configuration());
+ format.open(split);
+
+ Tuple3<String, String, String> result = new Tuple3<String, String, String>();
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("aa", result.f0);
+ assertEquals("bb", result.f1);
+ assertEquals("cc", result.f2);
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("aa", result.f0);
+ assertEquals("bb", result.f1);
+ assertEquals("", result.f2);
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("aa", result.f0);
+ assertEquals("", result.f1);
+ assertEquals("", result.f2);
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("", result.f0);
+ assertEquals("", result.f1);
+ assertEquals("", result.f2);
+
+ try {
+ format.nextRecord(result);
+ fail("Parse Exception was not thrown! (Row too short)");
+ } catch (ParseException e) {}
+ }
+
+ @Test
public void testIntegerFields() throws IOException {
try {
final String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n";
@@ -957,6 +1008,34 @@ public class CsvInputFormatTest {
}
@Test
+ public void testPojoTypeWithTrailingEmptyFields() throws Exception {
+ final String fileContent = "123,,3.123,,\n456,BBB,3.23,,";
+ final FileInputSplit split = createTempFile(fileContent);
+
+ @SuppressWarnings("unchecked")
+ PojoTypeInfo<PrivatePojoItem> typeInfo = (PojoTypeInfo<PrivatePojoItem>) TypeExtractor.createTypeInfo(PrivatePojoItem.class);
+ CsvInputFormat<PrivatePojoItem> inputFormat = new PojoCsvInputFormat<PrivatePojoItem>(PATH, typeInfo);
+
+ inputFormat.configure(new Configuration());
+ inputFormat.open(split);
+
+ PrivatePojoItem item = new PrivatePojoItem();
+ inputFormat.nextRecord(item);
+
+ assertEquals(123, item.field1);
+ assertEquals("", item.field2);
+ assertEquals(Double.valueOf(3.123), item.field3);
+ assertEquals("", item.field4);
+
+ inputFormat.nextRecord(item);
+
+ assertEquals(456, item.field1);
+ assertEquals("BBB", item.field2);
+ assertEquals(Double.valueOf(3.23), item.field3);
+ assertEquals("", item.field4);
+ }
+
+ @Test
public void testPojoTypeWithMappingInformation() throws Exception {
File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
tempFile.deleteOnExit();
http://git-wip-us.apache.org/repos/asf/flink/blob/5168b9f6/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
index b819641..f6bda30 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
@@ -230,7 +230,7 @@ public class RowCsvInputFormatTest {
@Test
public void readStringFields() throws Exception {
- String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
+ String fileContent = "abc|def|ghijk\nabc||hhg\n|||\n||";
FileInputSplit split = createTempFile(fileContent);
@@ -264,13 +264,19 @@ public class RowCsvInputFormatTest {
assertEquals("", result.getField(2));
result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("", result.getField(0));
+ assertEquals("", result.getField(1));
+ assertEquals("", result.getField(2));
+
+ result = format.nextRecord(result);
assertNull(result);
assertTrue(format.reachedEnd());
}
@Test
public void readMixedQuotedStringFields() throws Exception {
- String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||";
+ String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||\n";
FileInputSplit split = createTempFile(fileContent);
@@ -351,6 +357,65 @@ public class RowCsvInputFormatTest {
}
@Test
+ public void testTailingEmptyFields() throws Exception {
+ String fileContent = "abc|-def|-ghijk\n" +
+ "abc|-def|-\n" +
+ "abc|-|-\n" +
+ "|-|-|-\n" +
+ "|-|-\n" +
+ "abc|-def\n";
+
+ FileInputSplit split = createTempFile(fileContent);
+
+ TypeInformation[] fieldTypes = new TypeInformation[]{
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO};
+
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, fieldTypes, "\n", "|");
+ format.setFieldDelimiter("|-");
+ format.configure(new Configuration());
+ format.open(split);
+
+ Row result = new Row(3);
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("abc", result.getField(0));
+ assertEquals("def", result.getField(1));
+ assertEquals("ghijk", result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("abc", result.getField(0));
+ assertEquals("def", result.getField(1));
+ assertEquals("", result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("abc", result.getField(0));
+ assertEquals("", result.getField(1));
+ assertEquals("", result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("", result.getField(0));
+ assertEquals("", result.getField(1));
+ assertEquals("", result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("", result.getField(0));
+ assertEquals("", result.getField(1));
+ assertEquals("", result.getField(2));
+
+ try {
+ format.nextRecord(result);
+ fail("Parse Exception was not thrown! (Row too short)");
+ } catch (ParseException e) {}
+ }
+
+ @Test
public void testIntegerFields() throws Exception {
String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n";
@@ -396,12 +461,12 @@ public class RowCsvInputFormatTest {
public void testEmptyFields() throws Exception {
String fileContent =
",,,,,,,,\n" +
+ ",,,,,,,\n" +
",,,,,,,,\n" +
+ ",,,,,,,\n" +
",,,,,,,,\n" +
",,,,,,,,\n" +
- ",,,,,,,,\n" +
- ",,,,,,,,\n" +
- ",,,,,,,,\n" +
+ ",,,,,,,\n" +
",,,,,,,,\n";
FileInputSplit split = createTempFile(fileContent);