You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/05/19 09:51:14 UTC
[2/2] flink git commit: [FLINK-1820] Consistent behavior of numeric
value parsers.
[FLINK-1820] Consistent behavior of numeric value parsers.
This closes #566
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39d526e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39d526e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39d526e6
Branch: refs/heads/master
Commit: 39d526e6f8b26ff35e1023c65293982285ffcc78
Parents: 6403dbd
Author: FelixNeutatz <ne...@googlemail.com>
Authored: Fri Apr 3 18:13:32 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue May 19 00:52:29 2015 +0200
----------------------------------------------------------------------
.../apache/flink/types/parser/ByteParser.java | 68 +++++-----
.../flink/types/parser/ByteValueParser.java | 4 +
.../apache/flink/types/parser/DoubleParser.java | 69 +++++-----
.../flink/types/parser/DoubleValueParser.java | 8 +-
.../apache/flink/types/parser/FieldParser.java | 8 +-
.../apache/flink/types/parser/FloatParser.java | 69 +++++-----
.../flink/types/parser/FloatValueParser.java | 8 +-
.../apache/flink/types/parser/IntParser.java | 69 +++++-----
.../flink/types/parser/IntValueParser.java | 4 +
.../apache/flink/types/parser/LongParser.java | 51 ++++----
.../flink/types/parser/LongValueParser.java | 4 +
.../apache/flink/types/parser/ShortParser.java | 47 +++----
.../flink/types/parser/ShortValueParser.java | 4 +
.../flink/types/parser/ByteParserTest.java | 7 +-
.../flink/types/parser/ByteValueParserTest.java | 2 +-
.../flink/types/parser/DoubleParserTest.java | 2 +-
.../types/parser/DoubleValueParserTest.java | 2 +-
.../flink/types/parser/FloatParserTest.java | 2 +-
.../types/parser/FloatValueParserTest.java | 2 +-
.../flink/types/parser/IntParserTest.java | 2 +-
.../flink/types/parser/IntValueParserTest.java | 3 +-
.../flink/types/parser/LongParserTest.java | 2 +-
.../flink/types/parser/LongValueParserTest.java | 2 +-
.../flink/types/parser/ParserTestBase.java | 40 +++++-
.../flink/types/parser/ShortParserTest.java | 2 +-
.../types/parser/ShortValueParserTest.java | 2 +-
.../flink/api/java/io/CsvInputFormatTest.java | 127 +++++++++++++++++--
27 files changed, 405 insertions(+), 205 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
index 5858da2..09e517a 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
@@ -21,22 +21,23 @@ package org.apache.flink.types.parser;
public class ByteParser extends FieldParser<Byte> {
-
+
private byte result;
-
+
@Override
public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Byte reusable) {
int val = 0;
boolean neg = false;
- final int delimLimit = limit-delimiter.length+1;
-
+ final int delimLimit = limit - delimiter.length + 1;
+
if (bytes[startPos] == '-') {
neg = true;
startPos++;
-
+
// check for empty field with only the sign
- if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) {
+ if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos,
+ delimiter))) {
setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
return -1;
}
@@ -44,6 +45,10 @@ public class ByteParser extends FieldParser<Byte> {
for (int i = startPos; i < limit; i++) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+ if (i == startPos) {
+ setErrorState(ParseErrorState.EMPTY_STRING);
+ return -1;
+ }
this.result = (byte) (neg ? -val : val);
return i + delimiter.length;
}
@@ -53,17 +58,17 @@ public class ByteParser extends FieldParser<Byte> {
}
val *= 10;
val += bytes[i] - 48;
-
+
if (val > Byte.MAX_VALUE && (!neg || val > -Byte.MIN_VALUE)) {
setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW);
return -1;
}
}
-
+
this.result = (byte) (neg ? -val : val);
return limit;
}
-
+
@Override
public Byte createValue() {
return Byte.MIN_VALUE;
@@ -73,43 +78,40 @@ public class ByteParser extends FieldParser<Byte> {
public Byte getLastResult() {
return Byte.valueOf(this.result);
}
-
+
/**
- * Static utility to parse a field of type byte from a byte sequence that represents text characters
+ * Static utility to parse a field of type byte from a byte sequence that represents text
+ * characters
* (such as when read from a file stream).
- *
- * @param bytes The bytes containing the text data that should be parsed.
+ *
+ * @param bytes The bytes containing the text data that should be parsed.
* @param startPos The offset to start the parsing.
- * @param length The length of the byte sequence (counting from the offset).
- *
+ * @param length The length of the byte sequence (counting from the offset).
* @return The parsed value.
- *
- * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
+ * @throws NumberFormatException Thrown when the value cannot be parsed because the text
+ * represents not a correct number.
*/
public static final byte parseField(byte[] bytes, int startPos, int length) {
return parseField(bytes, startPos, length, (char) 0xffff);
}
-
+
/**
- * Static utility to parse a field of type byte from a byte sequence that represents text characters
+ * Static utility to parse a field of type byte from a byte sequence that represents text
+ * characters
* (such as when read from a file stream).
- *
- * @param bytes The bytes containing the text data that should be parsed.
- * @param startPos The offset to start the parsing.
- * @param length The length of the byte sequence (counting from the offset).
+ *
+ * @param bytes The bytes containing the text data that should be parsed.
+ * @param startPos The offset to start the parsing.
+ * @param length The length of the byte sequence (counting from the offset).
* @param delimiter The delimiter that terminates the field.
- *
* @return The parsed value.
- *
- * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
+ * @throws NumberFormatException Thrown when the value cannot be parsed because the text
+ * represents not a correct number.
*/
public static final byte parseField(byte[] bytes, int startPos, int length, char delimiter) {
- if (length <= 0) {
- throw new NumberFormatException("Invalid input: Empty string");
- }
long val = 0;
boolean neg = false;
-
+
if (bytes[startPos] == '-') {
neg = true;
startPos++;
@@ -118,17 +120,17 @@ public class ByteParser extends FieldParser<Byte> {
throw new NumberFormatException("Orphaned minus sign.");
}
}
-
+
for (; length > 0; startPos++, length--) {
if (bytes[startPos] == delimiter) {
- return (byte) (neg ? -val : val);
+ throw new NumberFormatException("Empty field.");
}
if (bytes[startPos] < 48 || bytes[startPos] > 57) {
throw new NumberFormatException("Invalid character.");
}
val *= 10;
val += bytes[startPos] - 48;
-
+
if (val > Byte.MAX_VALUE && (!neg || val > -Byte.MIN_VALUE)) {
throw new NumberFormatException("Value overflow/underflow");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
index f9b36e4..612a1cb 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
@@ -52,6 +52,10 @@ public class ByteValueParser extends FieldParser<ByteValue> {
for (int i = startPos; i < limit; i++) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+ if (i == startPos) {
+ setErrorState(ParseErrorState.EMPTY_STRING);
+ return -1;
+ }
reusable.setValue((byte) (neg ? -val : val));
return i + delimiter.length;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
index 947fdfe..086c1f5 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
@@ -23,35 +23,39 @@ package org.apache.flink.types.parser;
* Parses a text field into a Double.
*/
public class DoubleParser extends FieldParser<Double> {
-
+
private static final Double DOUBLE_INSTANCE = Double.valueOf(0.0);
-
+
private double result;
-
+
@Override
public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Double reusable) {
int i = startPos;
- final int delimLimit = limit-delimiter.length+1;
-
+ final int delimLimit = limit - delimiter.length + 1;
+
while (i < limit) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
break;
}
i++;
}
-
- String str = new String(bytes, startPos, i-startPos);
+
+ String str = new String(bytes, startPos, i - startPos);
+ int len = str.length();
+ if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i - 1, 0)])) {
+ setErrorState(ParseErrorState.WHITESPACE_IN_NUMERIC_FIELD);
+ return -1;
+ }
try {
this.result = Double.parseDouble(str);
return (i == limit) ? limit : i + delimiter.length;
- }
- catch (NumberFormatException e) {
+ } catch (NumberFormatException e) {
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
return -1;
}
}
-
+
@Override
public Double createValue() {
return DOUBLE_INSTANCE;
@@ -61,35 +65,35 @@ public class DoubleParser extends FieldParser<Double> {
public Double getLastResult() {
return Double.valueOf(this.result);
}
-
+
/**
- * Static utility to parse a field of type double from a byte sequence that represents text characters
+ * Static utility to parse a field of type double from a byte sequence that represents text
+ * characters
* (such as when read from a file stream).
- *
- * @param bytes The bytes containing the text data that should be parsed.
+ *
+ * @param bytes The bytes containing the text data that should be parsed.
* @param startPos The offset to start the parsing.
- * @param length The length of the byte sequence (counting from the offset).
- *
+ * @param length The length of the byte sequence (counting from the offset).
* @return The parsed value.
- *
- * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
+ * @throws NumberFormatException Thrown when the value cannot be parsed because the text
+ * represents not a correct number.
*/
public static final double parseField(byte[] bytes, int startPos, int length) {
return parseField(bytes, startPos, length, (char) 0xffff);
}
-
+
/**
- * Static utility to parse a field of type double from a byte sequence that represents text characters
+ * Static utility to parse a field of type double from a byte sequence that represents text
+ * characters
* (such as when read from a file stream).
- *
- * @param bytes The bytes containing the text data that should be parsed.
- * @param startPos The offset to start the parsing.
- * @param length The length of the byte sequence (counting from the offset).
+ *
+ * @param bytes The bytes containing the text data that should be parsed.
+ * @param startPos The offset to start the parsing.
+ * @param length The length of the byte sequence (counting from the offset).
* @param delimiter The delimiter that terminates the field.
- *
* @return The parsed value.
- *
- * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
+ * @throws NumberFormatException Thrown when the value cannot be parsed because the text
+ * represents not a correct number.
*/
public static final double parseField(byte[] bytes, int startPos, int length, char delimiter) {
if (length <= 0) {
@@ -97,12 +101,17 @@ public class DoubleParser extends FieldParser<Double> {
}
int i = 0;
final byte delByte = (byte) delimiter;
-
+
while (i < length && bytes[i] != delByte) {
i++;
}
-
- String str = new String(bytes, startPos, i);
+
+ String str = new String(bytes, startPos, i - startPos);
+ int len = str.length();
+ if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i - 1, 0)])) {
+ throw new NumberFormatException("There is leading or trailing whitespace in the " +
+ "numeric field: " + str);
+ }
return Double.parseDouble(str);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
index e225c1f..7751831 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
@@ -33,7 +33,7 @@ public class DoubleValueParser extends FieldParser<DoubleValue> {
int i = startPos;
- final int delimLimit = limit-delimiter.length+1;
+ final int delimLimit = limit - delimiter.length + 1;
while (i < limit) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
@@ -42,7 +42,11 @@ public class DoubleValueParser extends FieldParser<DoubleValue> {
i++;
}
- String str = new String(bytes, startPos, i-startPos);
+ String str = new String(bytes, startPos, i - startPos);
+ if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i - 1, 0)])) {
+ setErrorState(ParseErrorState.WHITESPACE_IN_NUMERIC_FIELD);
+ return -1;
+ }
try {
double value = Double.parseDouble(str);
reusable.setValue(value);
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index 33697fd..55e9915 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -64,7 +64,13 @@ public abstract class FieldParser<T> {
UNTERMINATED_QUOTED_STRING,
/** The parser found characters between the end of the quoted string and the delimiter. */
- UNQUOTED_CHARS_AFTER_QUOTED_STRING
+ UNQUOTED_CHARS_AFTER_QUOTED_STRING,
+
+ /** The string is empty. */
+ EMPTY_STRING,
+
+ /** There is whitespace in a numeric field. */
+ WHITESPACE_IN_NUMERIC_FIELD
}
private ParseErrorState errorState = ParseErrorState.NONE;
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
index 7d166c7..be98aa1 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
@@ -23,15 +23,16 @@ package org.apache.flink.types.parser;
* Parses a text field into a {@link Float}.
*/
public class FloatParser extends FieldParser<Float> {
-
+
private float result;
@Override
- public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Float reusable) {
-
+ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Float
+ reusable) {
+
int i = startPos;
- final int delimLimit = limit-delimiter.length+1;
+ final int delimLimit = limit - delimiter.length + 1;
while (i < limit) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
@@ -39,18 +40,23 @@ public class FloatParser extends FieldParser<Float> {
}
i++;
}
-
- String str = new String(bytes, startPos, i-startPos);
+
+ if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i - 1, 0)])) {
+ setErrorState(ParseErrorState.WHITESPACE_IN_NUMERIC_FIELD);
+ return -1;
+ }
+
+ String str = new String(bytes, startPos, i - startPos);
+ int len = str.length();
try {
this.result = Float.parseFloat(str);
- return (i == limit) ? limit : i+ delimiter.length;
- }
- catch (NumberFormatException e) {
+ return (i == limit) ? limit : i + delimiter.length;
+ } catch (NumberFormatException e) {
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
return -1;
}
}
-
+
@Override
public Float createValue() {
return Float.MIN_VALUE;
@@ -60,35 +66,35 @@ public class FloatParser extends FieldParser<Float> {
public Float getLastResult() {
return Float.valueOf(this.result);
}
-
+
/**
- * Static utility to parse a field of type float from a byte sequence that represents text characters
+ * Static utility to parse a field of type float from a byte sequence that represents text
+ * characters
* (such as when read from a file stream).
- *
- * @param bytes The bytes containing the text data that should be parsed.
+ *
+ * @param bytes The bytes containing the text data that should be parsed.
* @param startPos The offset to start the parsing.
- * @param length The length of the byte sequence (counting from the offset).
- *
+ * @param length The length of the byte sequence (counting from the offset).
* @return The parsed value.
- *
- * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
+ * @throws NumberFormatException Thrown when the value cannot be parsed because the text
+ * represents not a correct number.
*/
public static final float parseField(byte[] bytes, int startPos, int length) {
return parseField(bytes, startPos, length, (char) 0xffff);
}
-
+
/**
- * Static utility to parse a field of type float from a byte sequence that represents text characters
+ * Static utility to parse a field of type float from a byte sequence that represents text
+ * characters
* (such as when read from a file stream).
- *
- * @param bytes The bytes containing the text data that should be parsed.
- * @param startPos The offset to start the parsing.
- * @param length The length of the byte sequence (counting from the offset).
+ *
+ * @param bytes The bytes containing the text data that should be parsed.
+ * @param startPos The offset to start the parsing.
+ * @param length The length of the byte sequence (counting from the offset).
* @param delimiter The delimiter that terminates the field.
- *
* @return The parsed value.
- *
- * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
+ * @throws NumberFormatException Thrown when the value cannot be parsed because the text
+ * represents not a correct number.
*/
public static final float parseField(byte[] bytes, int startPos, int length, char delimiter) {
if (length <= 0) {
@@ -96,12 +102,17 @@ public class FloatParser extends FieldParser<Float> {
}
int i = 0;
final byte delByte = (byte) delimiter;
-
+
while (i < length && bytes[i] != delByte) {
i++;
}
- String str = new String(bytes, startPos, i);
+ String str = new String(bytes, startPos, i - startPos);
+ if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i - 1, 0)])) {
+ throw new NumberFormatException("There is leading or trailing whitespace in the " +
+ "numeric field: " + str);
+ }
+ int len = str.length();
return Float.parseFloat(str);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
index af16d4c..e8caac2 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
@@ -33,7 +33,7 @@ public class FloatValueParser extends FieldParser<FloatValue> {
int i = startPos;
- final int delimLimit = limit-delimiter.length+1;
+ final int delimLimit = limit - delimiter.length + 1;
while (i < limit) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
@@ -42,7 +42,11 @@ public class FloatValueParser extends FieldParser<FloatValue> {
i++;
}
- String str = new String(bytes, startPos, i-startPos);
+ String str = new String(bytes, startPos, i - startPos);
+ if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i - 1, 0)])) {
+ setErrorState(ParseErrorState.WHITESPACE_IN_NUMERIC_FIELD);
+ return -1;
+ }
try {
float value = Float.parseFloat(str);
reusable.setValue(value);
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
index c871f4a..dcd2ec2 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
@@ -25,32 +25,38 @@ package org.apache.flink.types.parser;
* The parser does not check for the maximum value.
*/
public class IntParser extends FieldParser<Integer> {
-
+
private static final long OVERFLOW_BOUND = 0x7fffffffL;
private static final long UNDERFLOW_BOUND = 0x80000000L;
private int result;
-
+
@Override
- public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Integer reusable) {
+ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Integer
+ reusable) {
long val = 0;
boolean neg = false;
- final int delimLimit = limit-delimiter.length+1;
+ final int delimLimit = limit - delimiter.length + 1;
if (bytes[startPos] == '-') {
neg = true;
startPos++;
-
+
// check for empty field with only the sign
- if (startPos == limit || ( startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) {
+ if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos,
+ delimiter))) {
setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
return -1;
}
}
-
+
for (int i = startPos; i < limit; i++) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+ if (i == startPos) {
+ setErrorState(ParseErrorState.EMPTY_STRING);
+ return -1;
+ }
this.result = (int) (neg ? -val : val);
return i + delimiter.length;
}
@@ -60,17 +66,17 @@ public class IntParser extends FieldParser<Integer> {
}
val *= 10;
val += bytes[i] - 48;
-
+
if (val > OVERFLOW_BOUND && (!neg || val > UNDERFLOW_BOUND)) {
setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW);
return -1;
}
}
-
+
this.result = (int) (neg ? -val : val);
return limit;
}
-
+
@Override
public Integer createValue() {
return Integer.MIN_VALUE;
@@ -80,40 +86,37 @@ public class IntParser extends FieldParser<Integer> {
public Integer getLastResult() {
return Integer.valueOf(this.result);
}
-
+
/**
- * Static utility to parse a field of type int from a byte sequence that represents text characters
+ * Static utility to parse a field of type int from a byte sequence that represents text
+ * characters
* (such as when read from a file stream).
- *
- * @param bytes The bytes containing the text data that should be parsed.
+ *
+ * @param bytes The bytes containing the text data that should be parsed.
* @param startPos The offset to start the parsing.
- * @param length The length of the byte sequence (counting from the offset).
- *
+ * @param length The length of the byte sequence (counting from the offset).
* @return The parsed value.
- *
- * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
+ * @throws NumberFormatException Thrown when the value cannot be parsed because the text
+ * represents not a correct number.
*/
public static final int parseField(byte[] bytes, int startPos, int length) {
return parseField(bytes, startPos, length, (char) 0xffff);
}
-
+
/**
- * Static utility to parse a field of type int from a byte sequence that represents text characters
+ * Static utility to parse a field of type int from a byte sequence that represents text
+ * characters
* (such as when read from a file stream).
- *
- * @param bytes The bytes containing the text data that should be parsed.
- * @param startPos The offset to start the parsing.
- * @param length The length of the byte sequence (counting from the offset).
+ *
+ * @param bytes The bytes containing the text data that should be parsed.
+ * @param startPos The offset to start the parsing.
+ * @param length The length of the byte sequence (counting from the offset).
* @param delimiter The delimiter that terminates the field.
- *
* @return The parsed value.
- *
- * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
+ * @throws NumberFormatException Thrown when the value cannot be parsed because the text
+ * represents not a correct number.
*/
public static final int parseField(byte[] bytes, int startPos, int length, char delimiter) {
- if (length <= 0) {
- throw new NumberFormatException("Invalid input: Empty string");
- }
long val = 0;
boolean neg = false;
@@ -125,17 +128,17 @@ public class IntParser extends FieldParser<Integer> {
throw new NumberFormatException("Orphaned minus sign.");
}
}
-
+
for (; length > 0; startPos++, length--) {
if (bytes[startPos] == delimiter) {
- return (int) (neg ? -val : val);
+ throw new NumberFormatException("Empty field.");
}
if (bytes[startPos] < 48 || bytes[startPos] > 57) {
throw new NumberFormatException("Invalid character.");
}
val *= 10;
val += bytes[startPos] - 48;
-
+
if (val > OVERFLOW_BOUND && (!neg || val > UNDERFLOW_BOUND)) {
throw new NumberFormatException("Value overflow/underflow");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
index 8cb8176..abd8615 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
@@ -54,6 +54,10 @@ public class IntValueParser extends FieldParser<IntValue> {
for (int i = startPos; i < limit; i++) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+ if (i == startPos) {
+ setErrorState(ParseErrorState.EMPTY_STRING);
+ return -1;
+ }
reusable.setValue((int) (neg ? -val : val));
return i + delimiter.length;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
index af17f15..bb6c7c9 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
@@ -24,9 +24,9 @@ package org.apache.flink.types.parser;
* Only characters '1' to '0' and '-' are allowed.
*/
public class LongParser extends FieldParser<Long> {
-
+
private long result;
-
+
@Override
public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Long reusable) {
long val = 0;
@@ -37,16 +37,20 @@ public class LongParser extends FieldParser<Long> {
if (bytes[startPos] == '-') {
neg = true;
startPos++;
-
+
// check for empty field with only the sign
if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) {
setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
return -1;
}
}
-
+
for (int i = startPos; i < limit; i++) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+ if (i == startPos) {
+ setErrorState(ParseErrorState.EMPTY_STRING);
+ return -1;
+ }
this.result = neg ? -val : val;
return i + delimiter.length;
}
@@ -56,15 +60,15 @@ public class LongParser extends FieldParser<Long> {
}
val *= 10;
val += bytes[i] - 48;
-
+
// check for overflow / underflow
if (val < 0) {
// this is an overflow/underflow, unless we hit exactly the Long.MIN_VALUE
if (neg && val == Long.MIN_VALUE) {
this.result = Long.MIN_VALUE;
-
+
if (i+1 >= limit) {
- return limit;
+ return limit;
} else if (i+1 < delimLimit && delimiterNext(bytes, i+1, delimiter)) {
return i + 1 + delimiter.length;
} else {
@@ -78,57 +82,54 @@ public class LongParser extends FieldParser<Long> {
}
}
}
-
+
this.result = neg ? -val : val;
return limit;
}
-
+
@Override
public Long createValue() {
return Long.MIN_VALUE;
}
-
+
@Override
public Long getLastResult() {
return Long.valueOf(this.result);
}
-
+
/**
* Static utility to parse a field of type long from a byte sequence that represents text characters
* (such as when read from a file stream).
- *
+ *
* @param bytes The bytes containing the text data that should be parsed.
* @param startPos The offset to start the parsing.
* @param length The length of the byte sequence (counting from the offset).
- *
+ *
* @return The parsed value.
- *
+ *
* @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
*/
public static final long parseField(byte[] bytes, int startPos, int length) {
return parseField(bytes, startPos, length, (char) 0xffff);
}
-
+
/**
* Static utility to parse a field of type long from a byte sequence that represents text characters
* (such as when read from a file stream).
- *
+ *
* @param bytes The bytes containing the text data that should be parsed.
* @param startPos The offset to start the parsing.
* @param length The length of the byte sequence (counting from the offset).
* @param delimiter The delimiter that terminates the field.
- *
+ *
* @return The parsed value.
- *
+ *
* @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
*/
public static final long parseField(byte[] bytes, int startPos, int length, char delimiter) {
- if (length <= 0) {
- throw new NumberFormatException("Invalid input: Empty string");
- }
long val = 0;
boolean neg = false;
-
+
if (bytes[startPos] == '-') {
neg = true;
startPos++;
@@ -137,17 +138,17 @@ public class LongParser extends FieldParser<Long> {
throw new NumberFormatException("Orphaned minus sign.");
}
}
-
+
for (; length > 0; startPos++, length--) {
if (bytes[startPos] == delimiter) {
- return neg ? -val : val;
+ throw new NumberFormatException("Empty field.");
}
if (bytes[startPos] < 48 || bytes[startPos] > 57) {
throw new NumberFormatException("Invalid character.");
}
val *= 10;
val += bytes[startPos] - 48;
-
+
// check for overflow / underflow
if (val < 0) {
// this is an overflow/underflow, unless we hit exactly the Long.MIN_VALUE
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
index 8b697cc..a99a86e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
@@ -51,6 +51,10 @@ public class LongValueParser extends FieldParser<LongValue> {
for (int i = startPos; i < limit; i++) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+ if (i == startPos) {
+ setErrorState(ParseErrorState.EMPTY_STRING);
+ return -1;
+ }
reusable.setValue(neg ? -val : val);
return i + delimiter.length;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
index a6f9898..6e04d60 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
@@ -25,10 +25,10 @@ package org.apache.flink.types.parser;
* The parser does not check for the maximum value.
*/
public class ShortParser extends FieldParser<Short> {
-
+
private static final int OVERFLOW_BOUND = 0x7fff;
private static final int UNDERFLOW_BOUND = 0x8000;
-
+
private short result;
@Override
@@ -37,20 +37,24 @@ public class ShortParser extends FieldParser<Short> {
boolean neg = false;
final int delimLimit = limit-delimiter.length+1;
-
+
if (bytes[startPos] == '-') {
neg = true;
startPos++;
-
+
// check for empty field with only the sign
if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) {
setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
return -1;
}
}
-
+
for (int i = startPos; i < limit; i++) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+ if (i == startPos) {
+ setErrorState(ParseErrorState.EMPTY_STRING);
+ return -1;
+ }
this.result = (short) (neg ? -val : val);
return i + delimiter.length;
}
@@ -60,17 +64,17 @@ public class ShortParser extends FieldParser<Short> {
}
val *= 10;
val += bytes[i] - 48;
-
+
if (val > OVERFLOW_BOUND && (!neg || val > UNDERFLOW_BOUND)) {
setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW);
return -1;
}
}
-
+
this.result = (short) (neg ? -val : val);
return limit;
}
-
+
@Override
public Short createValue() {
return Short.MIN_VALUE;
@@ -80,43 +84,40 @@ public class ShortParser extends FieldParser<Short> {
public Short getLastResult() {
return Short.valueOf(this.result);
}
-
+
/**
* Static utility to parse a field of type short from a byte sequence that represents text characters
* (such as when read from a file stream).
- *
+ *
* @param bytes The bytes containing the text data that should be parsed.
* @param startPos The offset to start the parsing.
* @param length The length of the byte sequence (counting from the offset).
- *
+ *
* @return The parsed value.
- *
+ *
* @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
*/
public static final short parseField(byte[] bytes, int startPos, int length) {
return parseField(bytes, startPos, length, (char) 0xffff);
}
-
+
/**
* Static utility to parse a field of type short from a byte sequence that represents text characters
* (such as when read from a file stream).
- *
+ *
* @param bytes The bytes containing the text data that should be parsed.
* @param startPos The offset to start the parsing.
* @param length The length of the byte sequence (counting from the offset).
* @param delimiter The delimiter that terminates the field.
- *
+ *
* @return The parsed value.
- *
+ *
* @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
*/
public static final short parseField(byte[] bytes, int startPos, int length, char delimiter) {
- if (length <= 0) {
- throw new NumberFormatException("Invalid input: Empty string");
- }
long val = 0;
boolean neg = false;
-
+
if (bytes[startPos] == '-') {
neg = true;
startPos++;
@@ -125,17 +126,17 @@ public class ShortParser extends FieldParser<Short> {
throw new NumberFormatException("Orphaned minus sign.");
}
}
-
+
for (; length > 0; startPos++, length--) {
if (bytes[startPos] == delimiter) {
- return (short) (neg ? -val : val);
+ throw new NumberFormatException("Empty field.");
}
if (bytes[startPos] < 48 || bytes[startPos] > 57) {
throw new NumberFormatException("Invalid character.");
}
val *= 10;
val += bytes[startPos] - 48;
-
+
if (val > OVERFLOW_BOUND && (!neg || val > UNDERFLOW_BOUND)) {
throw new NumberFormatException("Value overflow/underflow");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
index f5168cc..4289d1a 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
@@ -54,6 +54,10 @@ public class ShortValueParser extends FieldParser<ShortValue> {
for (int i = startPos; i < limit; i++) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+ if (i == startPos) {
+ setErrorState(ParseErrorState.EMPTY_STRING);
+ return -1;
+ }
reusable.setValue((short) (neg ? -val : val));
return i + delimiter.length;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
index 37d6903..ac49783 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
@@ -22,6 +22,10 @@ package org.apache.flink.types.parser;
import org.apache.flink.types.parser.ByteParser;
import org.apache.flink.types.parser.FieldParser;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
public class ByteParserTest extends ParserTestBase<Byte> {
@@ -43,7 +47,7 @@ public class ByteParserTest extends ParserTestBase<Byte> {
public String[] getInvalidTestValues() {
return new String[] {
"a", "9a", "-57-6", "7-88", String.valueOf(Byte.MAX_VALUE) + "0", String.valueOf(Short.MIN_VALUE),
- String.valueOf(Byte.MAX_VALUE + 1), String.valueOf(Byte.MIN_VALUE - 1)
+ String.valueOf(Byte.MAX_VALUE + 1), String.valueOf(Byte.MIN_VALUE - 1), " 1", "2 ", " ", "\t"
};
}
@@ -56,4 +60,5 @@ public class ByteParserTest extends ParserTestBase<Byte> {
public Class<Byte> getTypeClass() {
return Byte.class;
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
index a6c315a..1df3429 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
@@ -45,7 +45,7 @@ public class ByteValueParserTest extends ParserTestBase<ByteValue> {
public String[] getInvalidTestValues() {
return new String[] {
"a", "9a", "-57-6", "7-88", String.valueOf(Byte.MAX_VALUE) + "0", String.valueOf(Short.MIN_VALUE),
- String.valueOf(Byte.MAX_VALUE + 1), String.valueOf(Byte.MIN_VALUE - 1)
+ String.valueOf(Byte.MAX_VALUE + 1), String.valueOf(Byte.MIN_VALUE - 1), " 1", "2 ", " ", "\t"
};
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
index 71e78a0..c68dd43 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
@@ -50,7 +50,7 @@ public class DoubleParserTest extends ParserTestBase<Double> {
@Override
public String[] getInvalidTestValues() {
return new String[] {
- "a", "123abc4", "-57-6", "7-877678"
+ "a", "123abc4", "-57-6", "7-877678", " 1", "2 ", " ", "\t"
};
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
index 120dfac..7908180 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
@@ -52,7 +52,7 @@ public class DoubleValueParserTest extends ParserTestBase<DoubleValue> {
@Override
public String[] getInvalidTestValues() {
return new String[] {
- "a", "123abc4", "-57-6", "7-877678"
+ "a", "123abc4", "-57-6", "7-877678", " 1", "2 ", " ", "\t"
};
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
index 3c450a5..012e353 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
@@ -50,7 +50,7 @@ public class FloatParserTest extends ParserTestBase<Float> {
@Override
public String[] getInvalidTestValues() {
return new String[] {
- "a", "123abc4", "-57-6", "7-877678"
+ "a", "123abc4", "-57-6", "7-877678", " 1", "2 ", " ", "\t"
};
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
index be5b5b8..2b85de0 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
@@ -52,7 +52,7 @@ public class FloatValueParserTest extends ParserTestBase<FloatValue> {
@Override
public String[] getInvalidTestValues() {
return new String[] {
- "a", "123abc4", "-57-6", "7-877678"
+ "a", "123abc4", "-57-6", "7-877678", " 1", "2 ", " ", "\t"
};
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
index 6e1d4db..0f11fbd 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
@@ -43,7 +43,7 @@ public class IntParserTest extends ParserTestBase<Integer> {
public String[] getInvalidTestValues() {
return new String[] {
"a", "1569a86", "-57-6", "7-877678", String.valueOf(Integer.MAX_VALUE) + "0", String.valueOf(Long.MIN_VALUE),
- String.valueOf(((long) Integer.MAX_VALUE) + 1), String.valueOf(((long) Integer.MIN_VALUE) - 1)
+ String.valueOf(((long) Integer.MAX_VALUE) + 1), String.valueOf(((long) Integer.MIN_VALUE) - 1), " 1", "2 ", " ", "\t"
};
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
index e32f704..2b6d72e 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
@@ -45,7 +45,8 @@ public class IntValueParserTest extends ParserTestBase<IntValue> {
public String[] getInvalidTestValues() {
return new String[] {
"a", "1569a86", "-57-6", "7-877678", String.valueOf(Integer.MAX_VALUE) + "0", String.valueOf(Long.MIN_VALUE),
- String.valueOf(((long) Integer.MAX_VALUE) + 1), String.valueOf(((long) Integer.MIN_VALUE) - 1)
+ String.valueOf(((long) Integer.MAX_VALUE) + 1), String.valueOf(((long) Integer.MIN_VALUE) - 1),
+ " 1", "2 ", " ", "\t"
};
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
index 4dd116b..2f7ac8f 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
@@ -45,7 +45,7 @@ public class LongParserTest extends ParserTestBase<Long> {
public String[] getInvalidTestValues() {
return new String[] {
"a", "1569a86", "-57-6", "7-877678", String.valueOf(Long.MAX_VALUE) + "0", String.valueOf(Long.MIN_VALUE) + "0",
- "9223372036854775808", "-9223372036854775809"
+ "9223372036854775808", "-9223372036854775809", " 1", "2 ", " ", "\t"
};
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
index fac6f42..2000907 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
@@ -47,7 +47,7 @@ public class LongValueParserTest extends ParserTestBase<LongValue> {
public String[] getInvalidTestValues() {
return new String[] {
"a", "1569a86", "-57-6", "7-877678", String.valueOf(Long.MAX_VALUE) + "0", String.valueOf(Long.MIN_VALUE) + "0",
- "9223372036854775808", "-9223372036854775809"
+ "9223372036854775808", "-9223372036854775809", " 1", "2 ", " ", "\t"
};
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
index fb56add..dabac6f 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
@@ -25,7 +25,9 @@ import static org.junit.Assert.fail;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.Arrays;
+import org.apache.flink.types.StringValue;
import org.apache.flink.types.parser.FieldParser;
import org.junit.Test;
@@ -45,7 +47,6 @@ public abstract class ParserTestBase<T> {
public abstract Class<T> getTypeClass();
-
@Test
public void testTest() {
assertNotNull(getParser());
@@ -243,7 +244,7 @@ public abstract class ParserTestBase<T> {
FieldParser<T> parser = getParser();
byte[] bytes = testValues[i].getBytes();
- int numRead = parser.parseField(bytes, 0, bytes.length, new byte[] {'|'}, parser.createValue());
+ int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue());
assertTrue("Parser accepted the invalid value " + testValues[i] + ".", numRead == -1);
}
@@ -402,4 +403,39 @@ public abstract class ParserTestBase<T> {
return result;
}
+ @Test
+ public void testEmptyFieldInIsolation() {
+ try {
+ String [] emptyStrings = new String[] {"|"};
+
+ FieldParser<T> parser = getParser();
+
+ for (String emptyString : emptyStrings) {
+ byte[] bytes = emptyString.getBytes();
+ int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue());
+
+ if (getTypeClass() == String.class) {
+ assertTrue("Parser declared the empty string as invalid.", numRead != -1);
+ assertEquals("Invalid number of bytes read returned.", bytes.length, numRead);
+
+ T result = parser.getLastResult();
+ assertEquals("Parser parsed wrong.", "", result);
+ } else if(getTypeClass() == StringValue.class) {
+ assertTrue("Parser declared the empty string as invalid.", numRead != -1);
+ assertEquals("Invalid number of bytes read returned.", bytes.length, numRead);
+
+ T result = parser.getLastResult();
+ assertEquals("Parser parsed wrong.", new StringValue(""), result);
+ } else {
+ assertTrue("Parser accepted the empty string.", numRead == -1);
+ }
+ }
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Test erroneous: " + e.getMessage());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
index 3f4cd02..baea30f 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
@@ -43,7 +43,7 @@ public class ShortParserTest extends ParserTestBase<Short> {
public String[] getInvalidTestValues() {
return new String[] {
"a", "1569a86", "-57-6", "7-877678", String.valueOf(Short.MAX_VALUE) + "0", String.valueOf(Integer.MIN_VALUE),
- String.valueOf(Short.MAX_VALUE + 1), String.valueOf(Short.MIN_VALUE - 1)
+ String.valueOf(Short.MAX_VALUE + 1), String.valueOf(Short.MIN_VALUE - 1), " 1", "2 ", " ", "\t"
};
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
index 44f1589..c56df83 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
@@ -46,7 +46,7 @@ public class ShortValueParserTest extends ParserTestBase<ShortValue> {
public String[] getInvalidTestValues() {
return new String[] {
"a", "1569a86", "-57-6", "7-877678", String.valueOf(Short.MAX_VALUE) + "0", String.valueOf(Integer.MIN_VALUE),
- String.valueOf(Short.MAX_VALUE + 1), String.valueOf(Short.MIN_VALUE - 1)
+ String.valueOf(Short.MAX_VALUE + 1), String.valueOf(Short.MIN_VALUE - 1), " 1", "2 ", " ", "\t"
};
}
http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index bff3fec..3d87984 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.io;
import com.google.common.base.Charsets;
+import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -320,24 +321,24 @@ public class CsvInputFormatTest {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
-
+
@Test
- public void testIntegerFieldsl() throws IOException {
+ public void testIntegerFields() throws IOException {
try {
final String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n";
- final FileInputSplit split = createTempFile(fileContent);
+ final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple5<Integer, Integer, Integer, Integer, Integer>> typeInfo =
- TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class, Integer.class, Integer.class);
+ TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class, Integer.class, Integer.class);
final CsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>> format = new CsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>>(PATH, typeInfo);
-
+
format.setFieldDelimiter("|");
format.configure(new Configuration());
format.open(split);
-
+
Tuple5<Integer, Integer, Integer, Integer, Integer> result = new Tuple5<Integer, Integer, Integer, Integer, Integer>();
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Integer.valueOf(111), result.f0);
@@ -345,7 +346,7 @@ public class CsvInputFormatTest {
assertEquals(Integer.valueOf(333), result.f2);
assertEquals(Integer.valueOf(444), result.f3);
assertEquals(Integer.valueOf(555), result.f4);
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Integer.valueOf(666), result.f0);
@@ -353,6 +354,104 @@ public class CsvInputFormatTest {
assertEquals(Integer.valueOf(888), result.f2);
assertEquals(Integer.valueOf(999), result.f3);
assertEquals(Integer.valueOf(000), result.f4);
+
+ result = format.nextRecord(result);
+ assertNull(result);
+ assertTrue(format.reachedEnd());
+ }
+ catch (Exception ex) {
+ fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
+ }
+ }
+
+ @Test
+ public void testEmptyFields() throws IOException {
+ try {
+ final String fileContent = "|0|0|0|0|0|\n" +
+ "1||1|1|1|1|\n" +
+ "2|2||2|2|2|\n" +
+ "3|3|3| |3|3|\n" +
+ "4|4|4|4||4|\n" +
+ "5|5|5|5|5||\n";
+ final FileInputSplit split = createTempFile(fileContent);
+
+ final TupleTypeInfo<Tuple6<Short, Integer, Long, Float, Double, Byte>> typeInfo =
+ TupleTypeInfo.getBasicTupleTypeInfo(Short.class, Integer.class, Long.class, Float.class, Double.class, Byte.class);
+ final CsvInputFormat<Tuple6<Short, Integer, Long, Float, Double, Byte>> format = new CsvInputFormat<Tuple6<Short, Integer, Long, Float, Double, Byte>>(PATH, typeInfo);
+
+ format.setFieldDelimiter("|");
+
+ format.configure(new Configuration());
+ format.open(split);
+
+ Tuple6<Short, Integer, Long, Float, Double, Byte> result = new Tuple6<Short, Integer, Long, Float, Double, Byte>();
+
+ try {
+ result = format.nextRecord(result);
+ fail("Empty String Parse Exception was not thrown! (ShortParser)");
+ } catch (ParseException e) {}
+ try {
+ result = format.nextRecord(result);
+ fail("Empty String Parse Exception was not thrown! (IntegerParser)");
+ } catch (ParseException e) {}
+ try {
+ result = format.nextRecord(result);
+ fail("Empty String Parse Exception was not thrown! (LongParser)");
+ } catch (ParseException e) {}
+ try {
+ result = format.nextRecord(result);
+ fail("Empty String Parse Exception was not thrown! (FloatParser)");
+ } catch (ParseException e) {}
+ try {
+ result = format.nextRecord(result);
+ fail("Empty String Parse Exception was not thrown! (DoubleParser)");
+ } catch (ParseException e) {}
+ try {
+ result = format.nextRecord(result);
+ fail("Empty String Parse Exception was not thrown! (ByteParser)");
+ } catch (ParseException e) {}
+
+ result = format.nextRecord(result);
+ assertNull(result);
+ assertTrue(format.reachedEnd());
+ }
+ catch (Exception ex) {
+ fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
+ }
+ }
+
+ @Test
+ public void testDoubleFields() throws IOException {
+ try {
+ final String fileContent = "11.1|22.2|33.3|44.4|55.5\n66.6|77.7|88.8|99.9|00.0|\n";
+ final FileInputSplit split = createTempFile(fileContent);
+
+ final TupleTypeInfo<Tuple5<Double, Double, Double, Double, Double>> typeInfo =
+ TupleTypeInfo.getBasicTupleTypeInfo(Double.class, Double.class, Double.class, Double.class, Double.class);
+ final CsvInputFormat<Tuple5<Double, Double, Double, Double, Double>> format = new CsvInputFormat<Tuple5<Double, Double, Double, Double, Double>>(PATH, typeInfo);
+
+ format.setFieldDelimiter("|");
+
+ format.configure(new Configuration());
+ format.open(split);
+
+ Tuple5<Double, Double, Double, Double, Double> result = new Tuple5<Double, Double, Double, Double, Double>();
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals(Double.valueOf(11.1), result.f0);
+ assertEquals(Double.valueOf(22.2), result.f1);
+ assertEquals(Double.valueOf(33.3), result.f2);
+ assertEquals(Double.valueOf(44.4), result.f3);
+ assertEquals(Double.valueOf(55.5), result.f4);
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals(Double.valueOf(66.6), result.f0);
+ assertEquals(Double.valueOf(77.7), result.f1);
+ assertEquals(Double.valueOf(88.8), result.f2);
+ assertEquals(Double.valueOf(99.9), result.f3);
+ assertEquals(Double.valueOf(00.0), result.f4);
result = format.nextRecord(result);
assertNull(result);
@@ -367,7 +466,7 @@ public class CsvInputFormatTest {
public void testReadFirstN() throws IOException {
try {
final String fileContent = "111|222|333|444|555|\n666|777|888|999|000|\n";
- final FileInputSplit split = createTempFile(fileContent);
+ final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple2<Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class);
final CsvInputFormat<Tuple2<Integer, Integer>> format = new CsvInputFormat<Tuple2<Integer, Integer>>(PATH, typeInfo);
@@ -490,8 +589,9 @@ public class CsvInputFormatTest {
format.setFieldDelimiter("&&");
- format.setFields(new boolean[] { true, false, false, true, false, false, false, true }, new Class<?>[] { Integer.class,
- Integer.class, Integer.class });
+ format.setFields(new boolean[]{true, false, false, true, false, false, false, true}, new
+ Class<?>[]{Integer.class,
+ Integer.class, Integer.class});
format.configure(new Configuration());
format.open(split);
@@ -547,7 +647,7 @@ public class CsvInputFormatTest {
Object[][] failures = {
{"\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING},
- {"\"unterminated ", FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING}
+ {"\"unterminated ", FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING}
};
for (Object[] failure : failures) {
@@ -809,7 +909,8 @@ public class CsvInputFormatTest {
@SuppressWarnings("unchecked")
TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
- inputFormat.setFields(new boolean[]{true, false, true, false, true, true}, new Class[]{Integer.class, String.class, Double.class, String.class});
+ inputFormat.setFields(new boolean[]{true, false, true, false, true, true}, new Class[]{Integer.class, String
+ .class, Double.class, String.class});
inputFormat.configure(new Configuration());
FileInputSplit[] splits = inputFormat.createInputSplits(1);