You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/19 14:25:16 UTC
flink git commit: [FLINK-4081] [core] [table] FieldParsers should
support empty strings
Repository: flink
Updated Branches:
refs/heads/master d7b59d761 -> 4b1a9c72e
[FLINK-4081] [core] [table] FieldParsers should support empty strings
This closes #2297.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b1a9c72
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b1a9c72
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b1a9c72
Branch: refs/heads/master
Commit: 4b1a9c72e99125680035e5dadc148b187d9d4124
Parents: d7b59d7
Author: twalthr <tw...@apache.org>
Authored: Tue Jul 26 16:24:24 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Mon Sep 19 14:35:59 2016 +0200
----------------------------------------------------------------------
.../flink/types/parser/BooleanParser.java | 4 +++
.../apache/flink/types/parser/ByteParser.java | 2 +-
.../flink/types/parser/ByteValueParser.java | 2 +-
.../apache/flink/types/parser/DoubleParser.java | 4 +++
.../flink/types/parser/DoubleValueParser.java | 4 +++
.../apache/flink/types/parser/FieldParser.java | 4 +--
.../apache/flink/types/parser/FloatParser.java | 4 +++
.../flink/types/parser/FloatValueParser.java | 4 +++
.../apache/flink/types/parser/IntParser.java | 2 +-
.../flink/types/parser/IntValueParser.java | 2 +-
.../apache/flink/types/parser/LongParser.java | 2 +-
.../flink/types/parser/LongValueParser.java | 2 +-
.../apache/flink/types/parser/ShortParser.java | 2 +-
.../flink/types/parser/ShortValueParser.java | 2 +-
.../apache/flink/types/parser/StringParser.java | 8 ++++-
.../flink/types/parser/StringValueParser.java | 6 ++++
.../types/parser/BooleanValueParserTest.java | 2 +-
.../flink/types/parser/ByteParserTest.java | 3 --
.../flink/types/parser/ByteValueParserTest.java | 2 --
.../flink/types/parser/DoubleParserTest.java | 3 --
.../types/parser/DoubleValueParserTest.java | 2 --
.../flink/types/parser/FloatParserTest.java | 3 --
.../types/parser/FloatValueParserTest.java | 2 --
.../flink/types/parser/IntParserTest.java | 3 --
.../flink/types/parser/IntValueParserTest.java | 2 --
.../flink/types/parser/LongParserTest.java | 3 --
.../flink/types/parser/LongValueParserTest.java | 2 --
.../flink/types/parser/ParserTestBase.java | 2 ++
.../types/parser/QuotedStringParserTest.java | 2 +-
.../parser/QuotedStringValueParserTest.java | 4 +--
.../flink/types/parser/ShortParserTest.java | 3 --
.../types/parser/ShortValueParserTest.java | 2 --
.../types/parser/UnquotedStringParserTest.java | 5 +--
.../types/parser/VarLengthStringParserTest.java | 1 -
.../table/runtime/io/RowCsvInputFormat.scala | 14 +++++---
.../runtime/io/RowCsvInputFormatTest.scala | 37 ++++++++++----------
36 files changed, 77 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
index 90fa41e..f8b890a 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
@@ -44,6 +44,10 @@ public class BooleanParser extends FieldParser<Boolean> {
while (i < limit) {
if (i < delimLimit && delimiterNext(bytes, i, delim)) {
+ if (i == startPos) {
+ setErrorState(ParseErrorState.EMPTY_COLUMN);
+ return -1;
+ }
break;
}
i++;
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
index a521ac1..7ee257e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
@@ -48,7 +48,7 @@ public class ByteParser extends FieldParser<Byte> {
for (int i = startPos; i < limit; i++) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
if (i == startPos) {
- setErrorState(ParseErrorState.EMPTY_STRING);
+ setErrorState(ParseErrorState.EMPTY_COLUMN);
return -1;
}
this.result = (byte) (neg ? -val : val);
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
index 4cda98c..c79f5d4 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
@@ -55,7 +55,7 @@ public class ByteValueParser extends FieldParser<ByteValue> {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
if (i == startPos) {
- setErrorState(ParseErrorState.EMPTY_STRING);
+ setErrorState(ParseErrorState.EMPTY_COLUMN);
return -1;
}
reusable.setValue((byte) (neg ? -val : val));
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
index 0b2f5a2..8af496d 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
@@ -39,6 +39,10 @@ public class DoubleParser extends FieldParser<Double> {
while (i < limit) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+ if (i == startPos) {
+ setErrorState(ParseErrorState.EMPTY_COLUMN);
+ return -1;
+ }
break;
}
i++;
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
index 333e6c9..5c657be 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
@@ -39,6 +39,10 @@ public class DoubleValueParser extends FieldParser<DoubleValue> {
while (i < limit) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+ if (i == startPos) {
+ setErrorState(ParseErrorState.EMPTY_COLUMN);
+ return -1;
+ }
break;
}
i++;
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index 67c1bd7..5f17840 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -69,8 +69,8 @@ public abstract class FieldParser<T> {
/** The parser found characters between the end of the quoted string and the delimiter. */
UNQUOTED_CHARS_AFTER_QUOTED_STRING,
- /** The string is empty. */
- EMPTY_STRING,
+ /** The column is empty. */
+ EMPTY_COLUMN,
/** Invalid Boolean value **/
BOOLEAN_INVALID
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
index a47877e..3304f24 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
@@ -39,6 +39,10 @@ public class FloatParser extends FieldParser<Float> {
while (i < limit) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+ if (i == startPos) {
+ setErrorState(ParseErrorState.EMPTY_COLUMN);
+ return -1;
+ }
break;
}
i++;
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
index b6da3d3..26ee47b 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
@@ -39,6 +39,10 @@ public class FloatValueParser extends FieldParser<FloatValue> {
while (i < limit) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+ if (i == startPos) {
+ setErrorState(ParseErrorState.EMPTY_COLUMN);
+ return -1;
+ }
break;
}
i++;
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
index 4d2ae7c..4e5d43f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
@@ -57,7 +57,7 @@ public class IntParser extends FieldParser<Integer> {
for (int i = startPos; i < limit; i++) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
if (i == startPos) {
- setErrorState(ParseErrorState.EMPTY_STRING);
+ setErrorState(ParseErrorState.EMPTY_COLUMN);
return -1;
}
this.result = (int) (neg ? -val : val);
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
index d487c66..0229bc7 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
@@ -57,7 +57,7 @@ public class IntValueParser extends FieldParser<IntValue> {
for (int i = startPos; i < limit; i++) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
if (i == startPos) {
- setErrorState(ParseErrorState.EMPTY_STRING);
+ setErrorState(ParseErrorState.EMPTY_COLUMN);
return -1;
}
reusable.setValue((int) (neg ? -val : val));
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
index c7b76d2..79eb080 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
@@ -51,7 +51,7 @@ public class LongParser extends FieldParser<Long> {
for (int i = startPos; i < limit; i++) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
if (i == startPos) {
- setErrorState(ParseErrorState.EMPTY_STRING);
+ setErrorState(ParseErrorState.EMPTY_COLUMN);
return -1;
}
this.result = neg ? -val : val;
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
index 597abc0..5ddd40c 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
@@ -54,7 +54,7 @@ public class LongValueParser extends FieldParser<LongValue> {
for (int i = startPos; i < limit; i++) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
if (i == startPos) {
- setErrorState(ParseErrorState.EMPTY_STRING);
+ setErrorState(ParseErrorState.EMPTY_COLUMN);
return -1;
}
reusable.setValue(neg ? -val : val);
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
index 3afa761..c458a3f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
@@ -55,7 +55,7 @@ public class ShortParser extends FieldParser<Short> {
for (int i = startPos; i < limit; i++) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
if (i == startPos) {
- setErrorState(ParseErrorState.EMPTY_STRING);
+ setErrorState(ParseErrorState.EMPTY_COLUMN);
return -1;
}
this.result = (short) (neg ? -val : val);
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
index 880af25..47471a3 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
@@ -57,7 +57,7 @@ public class ShortValueParser extends FieldParser<ShortValue> {
for (int i = startPos; i < limit; i++) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
if (i == startPos) {
- setErrorState(ParseErrorState.EMPTY_STRING);
+ setErrorState(ParseErrorState.EMPTY_COLUMN);
return -1;
}
reusable.setValue((short) (neg ? -val : val));
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
index 9cee990..1a2c7e3 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
@@ -47,7 +47,7 @@ public class StringParser extends FieldParser<String> {
final int delimLimit = limit - delimiter.length + 1;
if(quotedStringParsing && bytes[i] == quoteCharacter) {
- // quoted string parsing enabled and first character Vis a quote
+ // quoted string parsing enabled and first character is a quote
i++;
// search for ending quote character, continue when it is escaped
@@ -84,10 +84,16 @@ public class StringParser extends FieldParser<String> {
if (i >= delimLimit) {
// no delimiter found. Take the full string
+ if (limit == startPos) {
+ setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
+ }
this.result = new String(bytes, startPos, limit - startPos);
return limit;
} else {
// delimiter found.
+ if (i == startPos) {
+ setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
+ }
this.result = new String(bytes, startPos, i - startPos);
return i + delimiter.length;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
index b136576..c72b029 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
@@ -90,10 +90,16 @@ public class StringValueParser extends FieldParser<StringValue> {
if (i >= delimLimit) {
// no delimiter found. Take the full string
+ if (limit == startPos) {
+ setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
+ }
reusable.setValueAscii(bytes, startPos, limit - startPos);
return limit;
} else {
// delimiter found.
+ if (i == startPos) {
+ setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
+ }
reusable.setValueAscii(bytes, startPos, i - startPos);
return i + delimiter.length;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java
index ff1885d..3b120e2 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java
@@ -19,9 +19,9 @@
package org.apache.flink.types.parser;
-
import org.apache.flink.types.BooleanValue;
+
public class BooleanValueParserTest extends ParserTestBase<BooleanValue> {
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
index dac5144..579f003 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
@@ -19,9 +19,6 @@
package org.apache.flink.types.parser;
-import org.apache.flink.types.parser.ByteParser;
-import org.apache.flink.types.parser.FieldParser;
-
public class ByteParserTest extends ParserTestBase<Byte> {
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
index 31b60d4..f5abe05 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
@@ -20,8 +20,6 @@
package org.apache.flink.types.parser;
import org.apache.flink.types.ByteValue;
-import org.apache.flink.types.parser.ByteValueParser;
-import org.apache.flink.types.parser.FieldParser;
public class ByteValueParserTest extends ParserTestBase<ByteValue> {
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
index bda8252..98655d1 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
@@ -19,9 +19,6 @@
package org.apache.flink.types.parser;
-import org.apache.flink.types.parser.DoubleParser;
-import org.apache.flink.types.parser.FieldParser;
-
public class DoubleParserTest extends ParserTestBase<Double> {
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
index fbbb5f2..dfe8936 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
@@ -20,8 +20,6 @@
package org.apache.flink.types.parser;
import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.parser.DoubleValueParser;
-import org.apache.flink.types.parser.FieldParser;
public class DoubleValueParserTest extends ParserTestBase<DoubleValue> {
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
index d05557f..480f1fb 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
@@ -19,9 +19,6 @@
package org.apache.flink.types.parser;
-import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.types.parser.FloatParser;
-
public class FloatParserTest extends ParserTestBase<Float> {
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
index 5c6e1c3..d71b20a 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
@@ -20,8 +20,6 @@
package org.apache.flink.types.parser;
import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.parser.FloatValueParser;
-import org.apache.flink.types.parser.FieldParser;
public class FloatValueParserTest extends ParserTestBase<FloatValue> {
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
index 1d33b51..f587086 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
@@ -19,9 +19,6 @@
package org.apache.flink.types.parser;
-import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.types.parser.IntParser;
-
public class IntParserTest extends ParserTestBase<Integer> {
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
index eb0403e..a70d65c 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
@@ -20,8 +20,6 @@
package org.apache.flink.types.parser;
import org.apache.flink.types.IntValue;
-import org.apache.flink.types.parser.IntValueParser;
-import org.apache.flink.types.parser.FieldParser;
public class IntValueParserTest extends ParserTestBase<IntValue> {
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
index b17de9d..d32eef1 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
@@ -19,9 +19,6 @@
package org.apache.flink.types.parser;
-import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.types.parser.LongParser;
-
public class LongParserTest extends ParserTestBase<Long> {
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
index f4d82a0..b9c5eec 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
@@ -20,8 +20,6 @@
package org.apache.flink.types.parser;
import org.apache.flink.types.LongValue;
-import org.apache.flink.types.parser.LongValueParser;
-import org.apache.flink.types.parser.FieldParser;
public class LongValueParserTest extends ParserTestBase<LongValue> {
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
index b979a38..9b02147 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
@@ -414,6 +414,8 @@ public abstract class ParserTestBase<T> extends TestLogger {
byte[] bytes = emptyString.getBytes();
int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue());
+ assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
+
if(this.allowsEmptyField()) {
assertTrue("Parser declared the empty string as invalid.", numRead != -1);
assertEquals("Invalid number of bytes read returned.", bytes.length, numRead);
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java
index d4b7e1f..6fda78a 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java
@@ -65,4 +65,4 @@ public class QuotedStringParserTest extends ParserTestBase<String> {
public Class<String> getTypeClass() {
return String.class;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java
index 2801582..2cf901c 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java
@@ -20,8 +20,6 @@
package org.apache.flink.types.parser;
import org.apache.flink.types.StringValue;
-import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.types.parser.StringValueParser;
public class QuotedStringValueParserTest extends ParserTestBase<StringValue> {
@@ -69,4 +67,4 @@ public class QuotedStringValueParserTest extends ParserTestBase<StringValue> {
public Class<StringValue> getTypeClass() {
return StringValue.class;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
index 201714b..822d871 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
@@ -19,9 +19,6 @@
package org.apache.flink.types.parser;
-import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.types.parser.ShortParser;
-
public class ShortParserTest extends ParserTestBase<Short> {
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
index 59e9c52..c4b5f02 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
@@ -20,8 +20,6 @@
package org.apache.flink.types.parser;
import org.apache.flink.types.ShortValue;
-import org.apache.flink.types.parser.ShortValueParser;
-import org.apache.flink.types.parser.FieldParser;
public class ShortValueParserTest extends ParserTestBase<ShortValue> {
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java
index 8e75192..739bd76 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java
@@ -19,9 +19,6 @@
package org.apache.flink.types.parser;
-import org.apache.flink.types.parser.StringParser;
-import org.apache.flink.types.parser.FieldParser;
-
public class UnquotedStringParserTest extends ParserTestBase<String> {
@@ -58,4 +55,4 @@ public class UnquotedStringParserTest extends ParserTestBase<String> {
public Class<String> getTypeClass() {
return String.class;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
index 4f9069e..1fe8850 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
-import org.apache.flink.types.parser.StringValueParser;
import org.junit.Test;
public class VarLengthStringParserTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala
index 1eb056c..b0ab801 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala
@@ -36,7 +36,8 @@ class RowCsvInputFormat(
rowTypeInfo: RowTypeInfo,
lineDelimiter: String = DEFAULT_LINE_DELIMITER,
fieldDelimiter: String = DEFAULT_FIELD_DELIMITER,
- includedFieldsMask: Array[Boolean] = null)
+ includedFieldsMask: Array[Boolean] = null,
+ emptyColumnAsNull: Boolean = false)
extends CsvInputFormat[Row](filePath) {
if (rowTypeInfo.getArity == 0) {
@@ -134,8 +135,8 @@ class RowCsvInputFormat(
holders(output))
if (!isLenient && (parser.getErrorState ne ParseErrorState.NONE)) {
- // Row is able to handle null values
- if (parser.getErrorState ne ParseErrorState.EMPTY_STRING) {
+ // the error state EMPTY_COLUMN is ignored
+ if (parser.getErrorState ne ParseErrorState.EMPTY_COLUMN) {
throw new ParseException(s"Parsing error for column $field of row '"
+ new String(bytes, offset, numBytes)
+ s"' originated by ${parser.getClass.getSimpleName}: ${parser.getErrorState}.")
@@ -143,8 +144,11 @@ class RowCsvInputFormat(
}
holders(output) = parser.getLastResult
- // check parse result
- if (startPos < 0) {
+ // check parse result:
+ // the result is null if it is invalid
+ // or empty with emptyColumnAsNull enabled
+ if (startPos < 0 ||
+ (emptyColumnAsNull && (parser.getErrorState eq ParseErrorState.EMPTY_COLUMN))) {
holders(output) = null
startPos = skipFields(bytes, latestValidPos, limit, fieldDelimiter)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
index 540776d..db01b69 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
@@ -379,39 +379,40 @@ class RowCsvInputFormatTest {
@Test
def testEmptyFields() {
val fileContent =
- "|0|0|0|0|0|\n" +
- "1||1|1|1|1|\n" +
- "2|2||2|2|2|\n" +
- "3|3|3||3|3|\n" +
- "4|4|4|4||4|\n" +
- "5|5|5|5|5||\n"
+ ",,,,,,,,\n" +
+ ",,,,,,,,\n" +
+ ",,,,,,,,\n" +
+ ",,,,,,,,\n" +
+ ",,,,,,,,\n" +
+ ",,,,,,,,\n" +
+ ",,,,,,,,\n" +
+ ",,,,,,,,\n"
val split = createTempFile(fileContent)
- // TODO: FLOAT_TYPE_INFO and DOUBLE_TYPE_INFO don't handle correctly null values
val typeInfo = new RowTypeInfo(Seq(
- BasicTypeInfo.SHORT_TYPE_INFO,
+ BasicTypeInfo.BOOLEAN_TYPE_INFO,
+ BasicTypeInfo.BYTE_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.FLOAT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.BYTE_TYPE_INFO))
+ BasicTypeInfo.SHORT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO))
- val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo)
- format.setFieldDelimiter("|")
+ val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo, emptyColumnAsNull = true)
+ format.setFieldDelimiter(",")
format.configure(new Configuration)
format.open(split)
- var result = new Row(6)
+ var result = new Row(8)
val linesCnt = fileContent.split("\n").length
- var i = 0
- while (i < linesCnt) {
+ for (i <- 0 until linesCnt) yield {
result = format.nextRecord(result)
assertNull(result.productElement(i))
- i += 1
}
-
+
// ensure no more rows
assertNull(format.nextRecord(result))
assertTrue(format.reachedEnd)