You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/05/19 09:51:14 UTC

[2/2] flink git commit: [FLINK-1820] Consistent behavior of numeric value parsers.

[FLINK-1820] Consistent behavior of numeric value parsers.

This closes #566


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39d526e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39d526e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39d526e6

Branch: refs/heads/master
Commit: 39d526e6f8b26ff35e1023c65293982285ffcc78
Parents: 6403dbd
Author: FelixNeutatz <ne...@googlemail.com>
Authored: Fri Apr 3 18:13:32 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue May 19 00:52:29 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/types/parser/ByteParser.java   |  68 +++++-----
 .../flink/types/parser/ByteValueParser.java     |   4 +
 .../apache/flink/types/parser/DoubleParser.java |  69 +++++-----
 .../flink/types/parser/DoubleValueParser.java   |   8 +-
 .../apache/flink/types/parser/FieldParser.java  |   8 +-
 .../apache/flink/types/parser/FloatParser.java  |  69 +++++-----
 .../flink/types/parser/FloatValueParser.java    |   8 +-
 .../apache/flink/types/parser/IntParser.java    |  69 +++++-----
 .../flink/types/parser/IntValueParser.java      |   4 +
 .../apache/flink/types/parser/LongParser.java   |  51 ++++----
 .../flink/types/parser/LongValueParser.java     |   4 +
 .../apache/flink/types/parser/ShortParser.java  |  47 +++----
 .../flink/types/parser/ShortValueParser.java    |   4 +
 .../flink/types/parser/ByteParserTest.java      |   7 +-
 .../flink/types/parser/ByteValueParserTest.java |   2 +-
 .../flink/types/parser/DoubleParserTest.java    |   2 +-
 .../types/parser/DoubleValueParserTest.java     |   2 +-
 .../flink/types/parser/FloatParserTest.java     |   2 +-
 .../types/parser/FloatValueParserTest.java      |   2 +-
 .../flink/types/parser/IntParserTest.java       |   2 +-
 .../flink/types/parser/IntValueParserTest.java  |   3 +-
 .../flink/types/parser/LongParserTest.java      |   2 +-
 .../flink/types/parser/LongValueParserTest.java |   2 +-
 .../flink/types/parser/ParserTestBase.java      |  40 +++++-
 .../flink/types/parser/ShortParserTest.java     |   2 +-
 .../types/parser/ShortValueParserTest.java      |   2 +-
 .../flink/api/java/io/CsvInputFormatTest.java   | 127 +++++++++++++++++--
 27 files changed, 405 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
index 5858da2..09e517a 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
@@ -21,22 +21,23 @@ package org.apache.flink.types.parser;
 
 
 public class ByteParser extends FieldParser<Byte> {
-	
+
 	private byte result;
-	
+
 	@Override
 	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Byte reusable) {
 		int val = 0;
 		boolean neg = false;
 
-		final int delimLimit = limit-delimiter.length+1;
-		
+		final int delimLimit = limit - delimiter.length + 1;
+
 		if (bytes[startPos] == '-') {
 			neg = true;
 			startPos++;
-			
+
 			// check for empty field with only the sign
-			if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) {
+			if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, 
+				delimiter))) {
 				setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
 				return -1;
 			}
@@ -44,6 +45,10 @@ public class ByteParser extends FieldParser<Byte> {
 
 		for (int i = startPos; i < limit; i++) {
 			if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+				if (i == startPos) {
+					setErrorState(ParseErrorState.EMPTY_STRING);
+					return -1;
+				}
 				this.result = (byte) (neg ? -val : val);
 				return i + delimiter.length;
 			}
@@ -53,17 +58,17 @@ public class ByteParser extends FieldParser<Byte> {
 			}
 			val *= 10;
 			val += bytes[i] - 48;
-			
+
 			if (val > Byte.MAX_VALUE && (!neg || val > -Byte.MIN_VALUE)) {
 				setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW);
 				return -1;
 			}
 		}
-		
+
 		this.result = (byte) (neg ? -val : val);
 		return limit;
 	}
-	
+
 	@Override
 	public Byte createValue() {
 		return Byte.MIN_VALUE;
@@ -73,43 +78,40 @@ public class ByteParser extends FieldParser<Byte> {
 	public Byte getLastResult() {
 		return Byte.valueOf(this.result);
 	}
-	
+
 	/**
-	 * Static utility to parse a field of type byte from a byte sequence that represents text characters
+	 * Static utility to parse a field of type byte from a byte sequence that represents text 
+	 * characters
 	 * (such as when read from a file stream).
-	 * 
-	 * @param bytes The bytes containing the text data that should be parsed.
+	 *
+	 * @param bytes    The bytes containing the text data that should be parsed.
 	 * @param startPos The offset to start the parsing.
-	 * @param length The length of the byte sequence (counting from the offset).
-	 * 
+	 * @param length   The length of the byte sequence (counting from the offset).
 	 * @return The parsed value.
-	 * 
-	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
+	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text 
+	 * represents not a correct number.
 	 */
 	public static final byte parseField(byte[] bytes, int startPos, int length) {
 		return parseField(bytes, startPos, length, (char) 0xffff);
 	}
-	
+
 	/**
-	 * Static utility to parse a field of type byte from a byte sequence that represents text characters
+	 * Static utility to parse a field of type byte from a byte sequence that represents text 
+	 * characters
 	 * (such as when read from a file stream).
-	 * 
-	 * @param bytes The bytes containing the text data that should be parsed.
-	 * @param startPos The offset to start the parsing.
-	 * @param length The length of the byte sequence (counting from the offset).
+	 *
+	 * @param bytes     The bytes containing the text data that should be parsed.
+	 * @param startPos  The offset to start the parsing.
+	 * @param length    The length of the byte sequence (counting from the offset).
 	 * @param delimiter The delimiter that terminates the field.
-	 * 
 	 * @return The parsed value.
-	 * 
-	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
+	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text 
+	 * represents not a correct number.
 	 */
 	public static final byte parseField(byte[] bytes, int startPos, int length, char delimiter) {
-		if (length <= 0) {
-			throw new NumberFormatException("Invalid input: Empty string");
-		}
 		long val = 0;
 		boolean neg = false;
-		
+
 		if (bytes[startPos] == '-') {
 			neg = true;
 			startPos++;
@@ -118,17 +120,17 @@ public class ByteParser extends FieldParser<Byte> {
 				throw new NumberFormatException("Orphaned minus sign.");
 			}
 		}
-		
+
 		for (; length > 0; startPos++, length--) {
 			if (bytes[startPos] == delimiter) {
-				return (byte) (neg ? -val : val);
+				throw new NumberFormatException("Empty field.");
 			}
 			if (bytes[startPos] < 48 || bytes[startPos] > 57) {
 				throw new NumberFormatException("Invalid character.");
 			}
 			val *= 10;
 			val += bytes[startPos] - 48;
-			
+
 			if (val > Byte.MAX_VALUE && (!neg || val > -Byte.MIN_VALUE)) {
 				throw new NumberFormatException("Value overflow/underflow");
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
index f9b36e4..612a1cb 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
@@ -52,6 +52,10 @@ public class ByteValueParser extends FieldParser<ByteValue> {
 		for (int i = startPos; i < limit; i++) {
 
 			if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+				if (i == startPos) {
+					setErrorState(ParseErrorState.EMPTY_STRING);
+					return -1;
+				}
 				reusable.setValue((byte) (neg ? -val : val));
 				return i + delimiter.length;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
index 947fdfe..086c1f5 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
@@ -23,35 +23,39 @@ package org.apache.flink.types.parser;
  * Parses a text field into a Double.
  */
 public class DoubleParser extends FieldParser<Double> {
-	
+
 	private static final Double DOUBLE_INSTANCE = Double.valueOf(0.0);
-	
+
 	private double result;
-	
+
 	@Override
 	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Double reusable) {
 		int i = startPos;
 
-		final int delimLimit = limit-delimiter.length+1;
-		
+		final int delimLimit = limit - delimiter.length + 1;
+
 		while (i < limit) {
 			if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
 				break;
 			}
 			i++;
 		}
-		
-		String str = new String(bytes, startPos, i-startPos);
+
+		String str = new String(bytes, startPos, i - startPos);
+		int len = str.length();
+		if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i - 1, 0)])) {
+			setErrorState(ParseErrorState.WHITESPACE_IN_NUMERIC_FIELD);
+			return -1;
+		}
 		try {
 			this.result = Double.parseDouble(str);
 			return (i == limit) ? limit : i + delimiter.length;
-		}
-		catch (NumberFormatException e) {
+		} catch (NumberFormatException e) {
 			setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
 			return -1;
 		}
 	}
-	
+
 	@Override
 	public Double createValue() {
 		return DOUBLE_INSTANCE;
@@ -61,35 +65,35 @@ public class DoubleParser extends FieldParser<Double> {
 	public Double getLastResult() {
 		return Double.valueOf(this.result);
 	}
-	
+
 	/**
-	 * Static utility to parse a field of type double from a byte sequence that represents text characters
+	 * Static utility to parse a field of type double from a byte sequence that represents text 
+	 * characters
 	 * (such as when read from a file stream).
-	 * 
-	 * @param bytes The bytes containing the text data that should be parsed.
+	 *
+	 * @param bytes    The bytes containing the text data that should be parsed.
 	 * @param startPos The offset to start the parsing.
-	 * @param length The length of the byte sequence (counting from the offset).
-	 * 
+	 * @param length   The length of the byte sequence (counting from the offset).
 	 * @return The parsed value.
-	 * 
-	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
+	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text 
+	 * represents not a correct number.
 	 */
 	public static final double parseField(byte[] bytes, int startPos, int length) {
 		return parseField(bytes, startPos, length, (char) 0xffff);
 	}
-	
+
 	/**
-	 * Static utility to parse a field of type double from a byte sequence that represents text characters
+	 * Static utility to parse a field of type double from a byte sequence that represents text 
+	 * characters
 	 * (such as when read from a file stream).
-	 * 
-	 * @param bytes The bytes containing the text data that should be parsed.
-	 * @param startPos The offset to start the parsing.
-	 * @param length The length of the byte sequence (counting from the offset).
+	 *
+	 * @param bytes     The bytes containing the text data that should be parsed.
+	 * @param startPos  The offset to start the parsing.
+	 * @param length    The length of the byte sequence (counting from the offset).
 	 * @param delimiter The delimiter that terminates the field.
-	 * 
 	 * @return The parsed value.
-	 * 
-	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
+	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text 
+	 * represents not a correct number.
 	 */
 	public static final double parseField(byte[] bytes, int startPos, int length, char delimiter) {
 		if (length <= 0) {
@@ -97,12 +101,17 @@ public class DoubleParser extends FieldParser<Double> {
 		}
 		int i = 0;
 		final byte delByte = (byte) delimiter;
-		
+
 		while (i < length && bytes[i] != delByte) {
 			i++;
 		}
-		
-		String str = new String(bytes, startPos, i);
+
+		String str = new String(bytes, startPos, i - startPos);
+		int len = str.length();
+		if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i - 1, 0)])) {
+			throw new NumberFormatException("There is leading or trailing whitespace in the " +
+				"numeric field: " + str);
+		}
 		return Double.parseDouble(str);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
index e225c1f..7751831 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
@@ -33,7 +33,7 @@ public class DoubleValueParser extends FieldParser<DoubleValue> {
 		
 		int i = startPos;
 
-		final int delimLimit = limit-delimiter.length+1;
+		final int delimLimit = limit - delimiter.length + 1;
 
 		while (i < limit) {
 			if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
@@ -42,7 +42,11 @@ public class DoubleValueParser extends FieldParser<DoubleValue> {
 			i++;
 		}
 		
-		String str = new String(bytes, startPos, i-startPos);
+		String str = new String(bytes, startPos, i - startPos);
+		if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i - 1, 0)])) {
+			setErrorState(ParseErrorState.WHITESPACE_IN_NUMERIC_FIELD);
+			return -1;
+		}
 		try {
 			double value = Double.parseDouble(str);
 			reusable.setValue(value);

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index 33697fd..55e9915 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -64,7 +64,13 @@ public abstract class FieldParser<T> {
 		UNTERMINATED_QUOTED_STRING,
 
 		/** The parser found characters between the end of the quoted string and the delimiter. */
-		UNQUOTED_CHARS_AFTER_QUOTED_STRING
+		UNQUOTED_CHARS_AFTER_QUOTED_STRING,
+		
+		/** The string is empty. */
+		EMPTY_STRING,
+
+		/** There is whitespace in a numeric field. */
+		WHITESPACE_IN_NUMERIC_FIELD
 	}
 	
 	private ParseErrorState errorState = ParseErrorState.NONE;

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
index 7d166c7..be98aa1 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
@@ -23,15 +23,16 @@ package org.apache.flink.types.parser;
  * Parses a text field into a {@link Float}.
  */
 public class FloatParser extends FieldParser<Float> {
-	
+
 	private float result;
 	
 	@Override
-	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Float reusable) {
-		
+	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Float 
+		reusable) {
+
 		int i = startPos;
 
-		final int delimLimit = limit-delimiter.length+1;
+		final int delimLimit = limit - delimiter.length + 1;
 
 		while (i < limit) {
 			if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
@@ -39,18 +40,23 @@ public class FloatParser extends FieldParser<Float> {
 			}
 			i++;
 		}
-		
-		String str = new String(bytes, startPos, i-startPos);
+
+		if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i - 1, 0)])) {
+			setErrorState(ParseErrorState.WHITESPACE_IN_NUMERIC_FIELD);
+			return -1;
+		}
+
+		String str = new String(bytes, startPos, i - startPos);
+		int len = str.length();
 		try {
 			this.result = Float.parseFloat(str);
-			return (i == limit) ? limit : i+ delimiter.length;
-		}
-		catch (NumberFormatException e) {
+			return (i == limit) ? limit : i + delimiter.length;
+		} catch (NumberFormatException e) {
 			setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
 			return -1;
 		}
 	}
-	
+
 	@Override
 	public Float createValue() {
 		return Float.MIN_VALUE;
@@ -60,35 +66,35 @@ public class FloatParser extends FieldParser<Float> {
 	public Float getLastResult() {
 		return Float.valueOf(this.result);
 	}
-	
+
 	/**
-	 * Static utility to parse a field of type float from a byte sequence that represents text characters
+	 * Static utility to parse a field of type float from a byte sequence that represents text 
+	 * characters
 	 * (such as when read from a file stream).
-	 * 
-	 * @param bytes The bytes containing the text data that should be parsed.
+	 *
+	 * @param bytes    The bytes containing the text data that should be parsed.
 	 * @param startPos The offset to start the parsing.
-	 * @param length The length of the byte sequence (counting from the offset).
-	 * 
+	 * @param length   The length of the byte sequence (counting from the offset).
 	 * @return The parsed value.
-	 * 
-	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
+	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text 
+	 * represents not a correct number.
 	 */
 	public static final float parseField(byte[] bytes, int startPos, int length) {
 		return parseField(bytes, startPos, length, (char) 0xffff);
 	}
-	
+
 	/**
-	 * Static utility to parse a field of type float from a byte sequence that represents text characters
+	 * Static utility to parse a field of type float from a byte sequence that represents text 
+	 * characters
 	 * (such as when read from a file stream).
-	 * 
-	 * @param bytes The bytes containing the text data that should be parsed.
-	 * @param startPos The offset to start the parsing.
-	 * @param length The length of the byte sequence (counting from the offset).
+	 *
+	 * @param bytes     The bytes containing the text data that should be parsed.
+	 * @param startPos  The offset to start the parsing.
+	 * @param length    The length of the byte sequence (counting from the offset).
 	 * @param delimiter The delimiter that terminates the field.
-	 * 
 	 * @return The parsed value.
-	 * 
-	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
+	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text 
+	 * represents not a correct number.
 	 */
 	public static final float parseField(byte[] bytes, int startPos, int length, char delimiter) {
 		if (length <= 0) {
@@ -96,12 +102,17 @@ public class FloatParser extends FieldParser<Float> {
 		}
 		int i = 0;
 		final byte delByte = (byte) delimiter;
-		
+
 		while (i < length && bytes[i] != delByte) {
 			i++;
 		}
 		
-		String str = new String(bytes, startPos, i);
+		String str = new String(bytes, startPos, i - startPos);
+		if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i - 1, 0)])) {
+			throw new NumberFormatException("There is leading or trailing whitespace in the " +
+				"numeric field: " + str);
+		}
+		int len = str.length();
 		return Float.parseFloat(str);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
index af16d4c..e8caac2 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
@@ -33,7 +33,7 @@ public class FloatValueParser extends FieldParser<FloatValue> {
 		
 		int i = startPos;
 
-		final int delimLimit = limit-delimiter.length+1;
+		final int delimLimit = limit - delimiter.length + 1;
 
 		while (i < limit) {
 			if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
@@ -42,7 +42,11 @@ public class FloatValueParser extends FieldParser<FloatValue> {
 			i++;
 		}
 		
-		String str = new String(bytes, startPos, i-startPos);
+		String str = new String(bytes, startPos, i - startPos);
+		if (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[Math.max(i - 1, 0)])) {
+			setErrorState(ParseErrorState.WHITESPACE_IN_NUMERIC_FIELD);
+			return -1;
+		}
 		try {
 			float value = Float.parseFloat(str);
 			reusable.setValue(value);

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
index c871f4a..dcd2ec2 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
@@ -25,32 +25,38 @@ package org.apache.flink.types.parser;
  * The parser does not check for the maximum value.
  */
 public class IntParser extends FieldParser<Integer> {
-	
+
 	private static final long OVERFLOW_BOUND = 0x7fffffffL;
 	private static final long UNDERFLOW_BOUND = 0x80000000L;
 
 	private int result;
-	
+
 	@Override
-	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Integer reusable) {
+	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Integer 
+		reusable) {
 		long val = 0;
 		boolean neg = false;
 
-		final int delimLimit = limit-delimiter.length+1;
+		final int delimLimit = limit - delimiter.length + 1;
 
 		if (bytes[startPos] == '-') {
 			neg = true;
 			startPos++;
-			
+
 			// check for empty field with only the sign
-			if (startPos == limit || ( startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) {
+			if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, 
+				delimiter))) {
 				setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
 				return -1;
 			}
 		}
-		
+
 		for (int i = startPos; i < limit; i++) {
 			if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+				if (i == startPos) {
+					setErrorState(ParseErrorState.EMPTY_STRING);
+					return -1;
+				}
 				this.result = (int) (neg ? -val : val);
 				return i + delimiter.length;
 			}
@@ -60,17 +66,17 @@ public class IntParser extends FieldParser<Integer> {
 			}
 			val *= 10;
 			val += bytes[i] - 48;
-			
+
 			if (val > OVERFLOW_BOUND && (!neg || val > UNDERFLOW_BOUND)) {
 				setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW);
 				return -1;
 			}
 		}
-		
+
 		this.result = (int) (neg ? -val : val);
 		return limit;
 	}
-	
+
 	@Override
 	public Integer createValue() {
 		return Integer.MIN_VALUE;
@@ -80,40 +86,37 @@ public class IntParser extends FieldParser<Integer> {
 	public Integer getLastResult() {
 		return Integer.valueOf(this.result);
 	}
-	
+
 	/**
-	 * Static utility to parse a field of type int from a byte sequence that represents text characters
+	 * Static utility to parse a field of type int from a byte sequence that represents text 
+	 * characters
 	 * (such as when read from a file stream).
-	 * 
-	 * @param bytes The bytes containing the text data that should be parsed.
+	 *
+	 * @param bytes    The bytes containing the text data that should be parsed.
 	 * @param startPos The offset to start the parsing.
-	 * @param length The length of the byte sequence (counting from the offset).
-	 * 
+	 * @param length   The length of the byte sequence (counting from the offset).
 	 * @return The parsed value.
-	 * 
-	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
+	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text 
+	 * represents not a correct number.
 	 */
 	public static final int parseField(byte[] bytes, int startPos, int length) {
 		return parseField(bytes, startPos, length, (char) 0xffff);
 	}
-	
+
 	/**
-	 * Static utility to parse a field of type int from a byte sequence that represents text characters
+	 * Static utility to parse a field of type int from a byte sequence that represents text 
+	 * characters
 	 * (such as when read from a file stream).
-	 * 
-	 * @param bytes The bytes containing the text data that should be parsed.
-	 * @param startPos The offset to start the parsing.
-	 * @param length The length of the byte sequence (counting from the offset).
+	 *
+	 * @param bytes     The bytes containing the text data that should be parsed.
+	 * @param startPos  The offset to start the parsing.
+	 * @param length    The length of the byte sequence (counting from the offset).
 	 * @param delimiter The delimiter that terminates the field.
-	 * 
 	 * @return The parsed value.
-	 * 
-	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
+	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text 
+	 * represents not a correct number.
 	 */
 	public static final int parseField(byte[] bytes, int startPos, int length, char delimiter) {
-		if (length <= 0) {
-			throw new NumberFormatException("Invalid input: Empty string");
-		}
 		long val = 0;
 		boolean neg = false;
 
@@ -125,17 +128,17 @@ public class IntParser extends FieldParser<Integer> {
 				throw new NumberFormatException("Orphaned minus sign.");
 			}
 		}
-		
+
 		for (; length > 0; startPos++, length--) {
 			if (bytes[startPos] == delimiter) {
-				return (int) (neg ? -val : val);
+				throw new NumberFormatException("Empty field.");
 			}
 			if (bytes[startPos] < 48 || bytes[startPos] > 57) {
 				throw new NumberFormatException("Invalid character.");
 			}
 			val *= 10;
 			val += bytes[startPos] - 48;
-			
+
 			if (val > OVERFLOW_BOUND && (!neg || val > UNDERFLOW_BOUND)) {
 				throw new NumberFormatException("Value overflow/underflow");
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
index 8cb8176..abd8615 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
@@ -54,6 +54,10 @@ public class IntValueParser extends FieldParser<IntValue> {
 		
 		for (int i = startPos; i < limit; i++) {
 			if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+				if (i == startPos) {
+					setErrorState(ParseErrorState.EMPTY_STRING);
+					return -1;
+				}
 				reusable.setValue((int) (neg ? -val : val));
 				return i + delimiter.length;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
index af17f15..bb6c7c9 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
@@ -24,9 +24,9 @@ package org.apache.flink.types.parser;
  * Only characters '1' to '0' and '-' are allowed.
  */
 public class LongParser extends FieldParser<Long> {
-	
+
 	private long result;
-	
+
 	@Override
 	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Long reusable) {
 		long val = 0;
@@ -37,16 +37,20 @@ public class LongParser extends FieldParser<Long> {
 		if (bytes[startPos] == '-') {
 			neg = true;
 			startPos++;
-			
+
 			// check for empty field with only the sign
 			if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) {
 				setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
 				return -1;
 			}
 		}
-		
+
 		for (int i = startPos; i < limit; i++) {
 			if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+				if (i == startPos) {
+					setErrorState(ParseErrorState.EMPTY_STRING);
+					return -1;
+				}
 				this.result = neg ? -val : val;
 				return i + delimiter.length;
 			}
@@ -56,15 +60,15 @@ public class LongParser extends FieldParser<Long> {
 			}
 			val *= 10;
 			val += bytes[i] - 48;
-			
+
 			// check for overflow / underflow
 			if (val < 0) {
 				// this is an overflow/underflow, unless we hit exactly the Long.MIN_VALUE
 				if (neg && val == Long.MIN_VALUE) {
 					this.result = Long.MIN_VALUE;
-					
+
 					if (i+1 >= limit) {
-						return limit; 
+						return limit;
 					} else if (i+1 < delimLimit && delimiterNext(bytes, i+1, delimiter)) {
 						return i + 1 + delimiter.length;
 					} else {
@@ -78,57 +82,54 @@ public class LongParser extends FieldParser<Long> {
 				}
 			}
 		}
-		
+
 		this.result = neg ? -val : val;
 		return limit;
 	}
-	
+
 	@Override
 	public Long createValue() {
 		return Long.MIN_VALUE;
 	}
-	
+
 	@Override
 	public Long getLastResult() {
 		return Long.valueOf(this.result);
 	}
-	
+
 	/**
 	 * Static utility to parse a field of type long from a byte sequence that represents text characters
 	 * (such as when read from a file stream).
-	 * 
+	 *
 	 * @param bytes The bytes containing the text data that should be parsed.
 	 * @param startPos The offset to start the parsing.
 	 * @param length The length of the byte sequence (counting from the offset).
-	 * 
+	 *
 	 * @return The parsed value.
-	 * 
+	 *
 	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
 	 */
 	public static final long parseField(byte[] bytes, int startPos, int length) {
 		return parseField(bytes, startPos, length, (char) 0xffff);
 	}
-	
+
 	/**
 	 * Static utility to parse a field of type long from a byte sequence that represents text characters
 	 * (such as when read from a file stream).
-	 * 
+	 *
 	 * @param bytes The bytes containing the text data that should be parsed.
 	 * @param startPos The offset to start the parsing.
 	 * @param length The length of the byte sequence (counting from the offset).
 	 * @param delimiter The delimiter that terminates the field.
-	 * 
+	 *
 	 * @return The parsed value.
-	 * 
+	 *
 	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
 	 */
 	public static final long parseField(byte[] bytes, int startPos, int length, char delimiter) {
-		if (length <= 0) {
-			throw new NumberFormatException("Invalid input: Empty string");
-		}
 		long val = 0;
 		boolean neg = false;
-		
+
 		if (bytes[startPos] == '-') {
 			neg = true;
 			startPos++;
@@ -137,17 +138,17 @@ public class LongParser extends FieldParser<Long> {
 				throw new NumberFormatException("Orphaned minus sign.");
 			}
 		}
-		
+
 		for (; length > 0; startPos++, length--) {
 			if (bytes[startPos] == delimiter) {
-				return neg ? -val : val;
+				throw new NumberFormatException("Empty field.");
 			}
 			if (bytes[startPos] < 48 || bytes[startPos] > 57) {
 				throw new NumberFormatException("Invalid character.");
 			}
 			val *= 10;
 			val += bytes[startPos] - 48;
-			
+
 			// check for overflow / underflow
 			if (val < 0) {
 				// this is an overflow/underflow, unless we hit exactly the Long.MIN_VALUE

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
index 8b697cc..a99a86e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
@@ -51,6 +51,10 @@ public class LongValueParser extends FieldParser<LongValue> {
 		
 		for (int i = startPos; i < limit; i++) {
 			if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+				if (i == startPos) {
+					setErrorState(ParseErrorState.EMPTY_STRING);
+					return -1;
+				}
 				reusable.setValue(neg ? -val : val);
 				return i + delimiter.length;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
index a6f9898..6e04d60 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
@@ -25,10 +25,10 @@ package org.apache.flink.types.parser;
  * The parser does not check for the maximum value.
  */
 public class ShortParser extends FieldParser<Short> {
-	
+
 	private static final int OVERFLOW_BOUND = 0x7fff;
 	private static final int UNDERFLOW_BOUND = 0x8000;
-	
+
 	private short result;
 
 	@Override
@@ -37,20 +37,24 @@ public class ShortParser extends FieldParser<Short> {
 		boolean neg = false;
 
 		final int delimLimit = limit-delimiter.length+1;
-		
+
 		if (bytes[startPos] == '-') {
 			neg = true;
 			startPos++;
-			
+
 			// check for empty field with only the sign
 			if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) {
 				setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
 				return -1;
 			}
 		}
-		
+
 		for (int i = startPos; i < limit; i++) {
 			if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+				if (i == startPos) {
+					setErrorState(ParseErrorState.EMPTY_STRING);
+					return -1;
+				}
 				this.result = (short) (neg ? -val : val);
 				return i + delimiter.length;
 			}
@@ -60,17 +64,17 @@ public class ShortParser extends FieldParser<Short> {
 			}
 			val *= 10;
 			val += bytes[i] - 48;
-			
+
 			if (val > OVERFLOW_BOUND && (!neg || val > UNDERFLOW_BOUND)) {
 				setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW);
 				return -1;
 			}
 		}
-		
+
 		this.result = (short) (neg ? -val : val);
 		return limit;
 	}
-	
+
 	@Override
 	public Short createValue() {
 		return Short.MIN_VALUE;
@@ -80,43 +84,40 @@ public class ShortParser extends FieldParser<Short> {
 	public Short getLastResult() {
 		return Short.valueOf(this.result);
 	}
-	
+
 	/**
 	 * Static utility to parse a field of type short from a byte sequence that represents text characters
 	 * (such as when read from a file stream).
-	 * 
+	 *
 	 * @param bytes The bytes containing the text data that should be parsed.
 	 * @param startPos The offset to start the parsing.
 	 * @param length The length of the byte sequence (counting from the offset).
-	 * 
+	 *
 	 * @return The parsed value.
-	 * 
+	 *
 	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
 	 */
 	public static final short parseField(byte[] bytes, int startPos, int length) {
 		return parseField(bytes, startPos, length, (char) 0xffff);
 	}
-	
+
 	/**
 	 * Static utility to parse a field of type short from a byte sequence that represents text characters
 	 * (such as when read from a file stream).
-	 * 
+	 *
 	 * @param bytes The bytes containing the text data that should be parsed.
 	 * @param startPos The offset to start the parsing.
 	 * @param length The length of the byte sequence (counting from the offset).
 	 * @param delimiter The delimiter that terminates the field.
-	 * 
+	 *
 	 * @return The parsed value.
-	 * 
+	 *
 	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text represents not a correct number.
 	 */
 	public static final short parseField(byte[] bytes, int startPos, int length, char delimiter) {
-		if (length <= 0) {
-			throw new NumberFormatException("Invalid input: Empty string");
-		}
 		long val = 0;
 		boolean neg = false;
-		
+
 		if (bytes[startPos] == '-') {
 			neg = true;
 			startPos++;
@@ -125,17 +126,17 @@ public class ShortParser extends FieldParser<Short> {
 				throw new NumberFormatException("Orphaned minus sign.");
 			}
 		}
-		
+
 		for (; length > 0; startPos++, length--) {
 			if (bytes[startPos] == delimiter) {
-				return (short) (neg ? -val : val);
+				throw new NumberFormatException("Empty field.");
 			}
 			if (bytes[startPos] < 48 || bytes[startPos] > 57) {
 				throw new NumberFormatException("Invalid character.");
 			}
 			val *= 10;
 			val += bytes[startPos] - 48;
-			
+
 			if (val > OVERFLOW_BOUND && (!neg || val > UNDERFLOW_BOUND)) {
 				throw new NumberFormatException("Value overflow/underflow");
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
index f5168cc..4289d1a 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
@@ -54,6 +54,10 @@ public class ShortValueParser extends FieldParser<ShortValue> {
 		
 		for (int i = startPos; i < limit; i++) {
 			if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+				if (i == startPos) {
+					setErrorState(ParseErrorState.EMPTY_STRING);
+					return -1;
+				}
 				reusable.setValue((short) (neg ? -val : val));
 				return i + delimiter.length;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
index 37d6903..ac49783 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
@@ -22,6 +22,10 @@ package org.apache.flink.types.parser;
 import org.apache.flink.types.parser.ByteParser;
 import org.apache.flink.types.parser.FieldParser;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 
 public class ByteParserTest extends ParserTestBase<Byte> {
 
@@ -43,7 +47,7 @@ public class ByteParserTest extends ParserTestBase<Byte> {
 	public String[] getInvalidTestValues() {
 		return new String[] {
 			"a", "9a", "-57-6", "7-88", String.valueOf(Byte.MAX_VALUE) + "0", String.valueOf(Short.MIN_VALUE),
-			String.valueOf(Byte.MAX_VALUE + 1), String.valueOf(Byte.MIN_VALUE - 1)
+			String.valueOf(Byte.MAX_VALUE + 1), String.valueOf(Byte.MIN_VALUE - 1),  " 1", "2 ", " ", "\t"
 		};
 	}
 
@@ -56,4 +60,5 @@ public class ByteParserTest extends ParserTestBase<Byte> {
 	public Class<Byte> getTypeClass() {
 		return Byte.class;
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
index a6c315a..1df3429 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
@@ -45,7 +45,7 @@ public class ByteValueParserTest extends ParserTestBase<ByteValue> {
 	public String[] getInvalidTestValues() {
 		return new String[] {
 			"a", "9a", "-57-6", "7-88", String.valueOf(Byte.MAX_VALUE) + "0", String.valueOf(Short.MIN_VALUE),
-			String.valueOf(Byte.MAX_VALUE + 1), String.valueOf(Byte.MIN_VALUE - 1)
+			String.valueOf(Byte.MAX_VALUE + 1), String.valueOf(Byte.MIN_VALUE - 1),  " 1", "2 ", " ", "\t"
 		};
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
index 71e78a0..c68dd43 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
@@ -50,7 +50,7 @@ public class DoubleParserTest extends ParserTestBase<Double> {
 	@Override
 	public String[] getInvalidTestValues() {
 		return new String[] {
-			"a", "123abc4", "-57-6", "7-877678"
+			"a", "123abc4", "-57-6", "7-877678", " 1", "2 ", " ", "\t"
 		};
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
index 120dfac..7908180 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
@@ -52,7 +52,7 @@ public class DoubleValueParserTest extends ParserTestBase<DoubleValue> {
 	@Override
 	public String[] getInvalidTestValues() {
 		return new String[] {
-			"a", "123abc4", "-57-6", "7-877678"
+			"a", "123abc4", "-57-6", "7-877678", " 1", "2 ", " ", "\t"
 		};
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
index 3c450a5..012e353 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
@@ -50,7 +50,7 @@ public class FloatParserTest extends ParserTestBase<Float> {
 	@Override
 	public String[] getInvalidTestValues() {
 		return new String[] {
-			"a", "123abc4", "-57-6", "7-877678"
+			"a", "123abc4", "-57-6", "7-877678", " 1", "2 ", " ", "\t"
 		};
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
index be5b5b8..2b85de0 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
@@ -52,7 +52,7 @@ public class FloatValueParserTest extends ParserTestBase<FloatValue> {
 	@Override
 	public String[] getInvalidTestValues() {
 		return new String[] {
-			"a", "123abc4", "-57-6", "7-877678"
+			"a", "123abc4", "-57-6", "7-877678", " 1", "2 ", " ", "\t"
 		};
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
index 6e1d4db..0f11fbd 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
@@ -43,7 +43,7 @@ public class IntParserTest extends ParserTestBase<Integer> {
 	public String[] getInvalidTestValues() {
 		return new String[] {
 			"a", "1569a86", "-57-6", "7-877678", String.valueOf(Integer.MAX_VALUE) + "0", String.valueOf(Long.MIN_VALUE),
-			String.valueOf(((long) Integer.MAX_VALUE) + 1), String.valueOf(((long) Integer.MIN_VALUE) - 1)
+			String.valueOf(((long) Integer.MAX_VALUE) + 1), String.valueOf(((long) Integer.MIN_VALUE) - 1), " 1", "2 ", " ", "\t"
 		};
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
index e32f704..2b6d72e 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
@@ -45,7 +45,8 @@ public class IntValueParserTest extends ParserTestBase<IntValue> {
 	public String[] getInvalidTestValues() {
 		return new String[] {
 			"a", "1569a86", "-57-6", "7-877678", String.valueOf(Integer.MAX_VALUE) + "0", String.valueOf(Long.MIN_VALUE),
-			String.valueOf(((long) Integer.MAX_VALUE) + 1), String.valueOf(((long) Integer.MIN_VALUE) - 1)
+			String.valueOf(((long) Integer.MAX_VALUE) + 1), String.valueOf(((long) Integer.MIN_VALUE) - 1),
+			" 1", "2 ", " ", "\t"
 		};
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
index 4dd116b..2f7ac8f 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
@@ -45,7 +45,7 @@ public class LongParserTest extends ParserTestBase<Long> {
 	public String[] getInvalidTestValues() {
 		return new String[] {
 			"a", "1569a86", "-57-6", "7-877678", String.valueOf(Long.MAX_VALUE) + "0", String.valueOf(Long.MIN_VALUE) + "0",
-			"9223372036854775808", "-9223372036854775809"
+			"9223372036854775808", "-9223372036854775809", " 1", "2 ", " ", "\t"
 		};
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
index fac6f42..2000907 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
@@ -47,7 +47,7 @@ public class LongValueParserTest extends ParserTestBase<LongValue> {
 	public String[] getInvalidTestValues() {
 		return new String[] {
 			"a", "1569a86", "-57-6", "7-877678", String.valueOf(Long.MAX_VALUE) + "0", String.valueOf(Long.MIN_VALUE) + "0",
-			"9223372036854775808", "-9223372036854775809"
+			"9223372036854775808", "-9223372036854775809", " 1", "2 ", " ", "\t"
 		};
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
index fb56add..dabac6f 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
@@ -25,7 +25,9 @@ import static org.junit.Assert.fail;
 
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.Arrays;
 
+import org.apache.flink.types.StringValue;
 import org.apache.flink.types.parser.FieldParser;
 import org.junit.Test;
 
@@ -45,7 +47,6 @@ public abstract class ParserTestBase<T> {
 	
 	public abstract Class<T> getTypeClass();
 	
-
 	@Test
 	public void testTest() {
 		assertNotNull(getParser());
@@ -243,7 +244,7 @@ public abstract class ParserTestBase<T> {
 				FieldParser<T> parser = getParser();
 				
 				byte[] bytes = testValues[i].getBytes();
-				int numRead = parser.parseField(bytes, 0, bytes.length, new byte[] {'|'}, parser.createValue());
+				int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue());
 				
 				assertTrue("Parser accepted the invalid value " + testValues[i] + ".", numRead == -1);
 			}
@@ -402,4 +403,39 @@ public abstract class ParserTestBase<T> {
 		return result;
 	}
 
+	@Test
+	public void testEmptyFieldInIsolation() {
+		try {
+			String [] emptyStrings = new String[] {"|"};
+
+			FieldParser<T> parser = getParser();
+
+			for (String emptyString : emptyStrings) {
+				byte[] bytes = emptyString.getBytes();
+				int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue());
+
+				if (getTypeClass() == String.class) {
+					assertTrue("Parser declared the empty string as invalid.", numRead != -1);
+					assertEquals("Invalid number of bytes read returned.", bytes.length, numRead);
+
+					T result = parser.getLastResult();
+					assertEquals("Parser parsed wrong.", "", result);
+				} else if(getTypeClass() == StringValue.class) {
+					assertTrue("Parser declared the empty string as invalid.", numRead != -1);
+					assertEquals("Invalid number of bytes read returned.", bytes.length, numRead);
+
+					T result = parser.getLastResult();
+					assertEquals("Parser parsed wrong.", new StringValue(""), result);
+				} else {
+					assertTrue("Parser accepted the empty string.", numRead == -1);
+				}
+			}
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test erroneous: " + e.getMessage());
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
index 3f4cd02..baea30f 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
@@ -43,7 +43,7 @@ public class ShortParserTest extends ParserTestBase<Short> {
 	public String[] getInvalidTestValues() {
 		return new String[] {
 			"a", "1569a86", "-57-6", "7-877678", String.valueOf(Short.MAX_VALUE) + "0", String.valueOf(Integer.MIN_VALUE),
-			String.valueOf(Short.MAX_VALUE + 1), String.valueOf(Short.MIN_VALUE - 1)
+			String.valueOf(Short.MAX_VALUE + 1), String.valueOf(Short.MIN_VALUE - 1), " 1", "2 ", " ", "\t"
 		};
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
index 44f1589..c56df83 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
@@ -46,7 +46,7 @@ public class ShortValueParserTest extends ParserTestBase<ShortValue> {
 	public String[] getInvalidTestValues() {
 		return new String[] {
 			"a", "1569a86", "-57-6", "7-877678", String.valueOf(Short.MAX_VALUE) + "0", String.valueOf(Integer.MIN_VALUE),
-			String.valueOf(Short.MAX_VALUE + 1), String.valueOf(Short.MIN_VALUE - 1)
+			String.valueOf(Short.MAX_VALUE + 1), String.valueOf(Short.MIN_VALUE - 1), " 1", "2 ", " ", "\t"
 		};
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/39d526e6/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index bff3fec..3d87984 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.io;
 
 import com.google.common.base.Charsets;
 
+import org.apache.flink.api.common.io.ParseException;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.*;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -320,24 +321,24 @@ public class CsvInputFormatTest {
 			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
 		}
 	}
-	
+
 	@Test
-	public void testIntegerFieldsl() throws IOException {
+	public void testIntegerFields() throws IOException {
 		try {
 			final String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n";
-			final FileInputSplit split = createTempFile(fileContent);	
+			final FileInputSplit split = createTempFile(fileContent);
 
 			final TupleTypeInfo<Tuple5<Integer, Integer, Integer, Integer, Integer>> typeInfo =
-					TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class, Integer.class, Integer.class);
+				TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class, Integer.class, Integer.class);
 			final CsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>> format = new CsvInputFormat<Tuple5<Integer, Integer, Integer, Integer, Integer>>(PATH, typeInfo);
-			
+
 			format.setFieldDelimiter("|");
 
 			format.configure(new Configuration());
 			format.open(split);
-			
+
 			Tuple5<Integer, Integer, Integer, Integer, Integer> result = new Tuple5<Integer, Integer, Integer, Integer, Integer>();
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals(Integer.valueOf(111), result.f0);
@@ -345,7 +346,7 @@ public class CsvInputFormatTest {
 			assertEquals(Integer.valueOf(333), result.f2);
 			assertEquals(Integer.valueOf(444), result.f3);
 			assertEquals(Integer.valueOf(555), result.f4);
-			
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals(Integer.valueOf(666), result.f0);
@@ -353,6 +354,104 @@ public class CsvInputFormatTest {
 			assertEquals(Integer.valueOf(888), result.f2);
 			assertEquals(Integer.valueOf(999), result.f3);
 			assertEquals(Integer.valueOf(000), result.f4);
+
+			result = format.nextRecord(result);
+			assertNull(result);
+			assertTrue(format.reachedEnd());
+		}
+		catch (Exception ex) {
+			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
+		}
+	}
+
+	@Test
+	public void testEmptyFields() throws IOException {
+		try {
+			final String fileContent = "|0|0|0|0|0|\n" +
+				"1||1|1|1|1|\n" +
+				"2|2||2|2|2|\n" +
+				"3|3|3| |3|3|\n" +
+				"4|4|4|4||4|\n" +
+				"5|5|5|5|5||\n";
+			final FileInputSplit split = createTempFile(fileContent);
+
+			final TupleTypeInfo<Tuple6<Short, Integer, Long, Float, Double, Byte>> typeInfo =
+				TupleTypeInfo.getBasicTupleTypeInfo(Short.class, Integer.class, Long.class, Float.class, Double.class, Byte.class);
+			final CsvInputFormat<Tuple6<Short, Integer, Long, Float, Double, Byte>> format = new CsvInputFormat<Tuple6<Short, Integer, Long, Float, Double, Byte>>(PATH, typeInfo);
+
+			format.setFieldDelimiter("|");
+
+			format.configure(new Configuration());
+			format.open(split);
+
+			Tuple6<Short, Integer, Long, Float, Double, Byte> result = new Tuple6<Short, Integer, Long, Float, Double, Byte>();
+
+			try {
+				result = format.nextRecord(result);
+				fail("Empty String Parse Exception was not thrown! (ShortParser)");
+			} catch (ParseException e) {}
+			try {
+				result = format.nextRecord(result);
+				fail("Empty String Parse Exception was not thrown! (IntegerParser)");
+			} catch (ParseException e) {}
+			try {
+				result = format.nextRecord(result);
+				fail("Empty String Parse Exception was not thrown! (LongParser)");
+			} catch (ParseException e) {}
+			try {
+				result = format.nextRecord(result);
+				fail("Empty String Parse Exception was not thrown! (FloatParser)");
+			} catch (ParseException e) {}
+			try {
+				result = format.nextRecord(result);
+				fail("Empty String Parse Exception was not thrown! (DoubleParser)");
+			} catch (ParseException e) {}
+			try {
+				result = format.nextRecord(result);
+				fail("Empty String Parse Exception was not thrown! (ByteParser)");
+			} catch (ParseException e) {}
+
+			result = format.nextRecord(result);
+			assertNull(result);
+			assertTrue(format.reachedEnd());
+		}
+		catch (Exception ex) {
+			fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
+		}
+	}
+
+	@Test
+	public void testDoubleFields() throws IOException {
+		try {
+			final String fileContent = "11.1|22.2|33.3|44.4|55.5\n66.6|77.7|88.8|99.9|00.0|\n";
+			final FileInputSplit split = createTempFile(fileContent);	
+
+			final TupleTypeInfo<Tuple5<Double, Double, Double, Double, Double>> typeInfo =
+					TupleTypeInfo.getBasicTupleTypeInfo(Double.class, Double.class, Double.class, Double.class, Double.class);
+			final CsvInputFormat<Tuple5<Double, Double, Double, Double, Double>> format = new CsvInputFormat<Tuple5<Double, Double, Double, Double, Double>>(PATH, typeInfo);
+			
+			format.setFieldDelimiter("|");
+
+			format.configure(new Configuration());
+			format.open(split);
+			
+			Tuple5<Double, Double, Double, Double, Double> result = new Tuple5<Double, Double, Double, Double, Double>();
+			
+			result = format.nextRecord(result);
+			assertNotNull(result);
+			assertEquals(Double.valueOf(11.1), result.f0);
+			assertEquals(Double.valueOf(22.2), result.f1);
+			assertEquals(Double.valueOf(33.3), result.f2);
+			assertEquals(Double.valueOf(44.4), result.f3);
+			assertEquals(Double.valueOf(55.5), result.f4);
+			
+			result = format.nextRecord(result);
+			assertNotNull(result);
+			assertEquals(Double.valueOf(66.6), result.f0);
+			assertEquals(Double.valueOf(77.7), result.f1);
+			assertEquals(Double.valueOf(88.8), result.f2);
+			assertEquals(Double.valueOf(99.9), result.f3);
+			assertEquals(Double.valueOf(00.0), result.f4);
 			
 			result = format.nextRecord(result);
 			assertNull(result);
@@ -367,7 +466,7 @@ public class CsvInputFormatTest {
 	public void testReadFirstN() throws IOException {
 		try {
 			final String fileContent = "111|222|333|444|555|\n666|777|888|999|000|\n";
-			final FileInputSplit split = createTempFile(fileContent);	
+			final FileInputSplit split = createTempFile(fileContent);
 
 			final TupleTypeInfo<Tuple2<Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class);
 			final CsvInputFormat<Tuple2<Integer, Integer>> format = new CsvInputFormat<Tuple2<Integer, Integer>>(PATH, typeInfo);
@@ -490,8 +589,9 @@ public class CsvInputFormatTest {
 			
 			format.setFieldDelimiter("&&");
 
-			format.setFields(new boolean[] { true, false, false, true, false, false, false, true }, new Class<?>[] { Integer.class,
-					Integer.class, Integer.class });
+			format.setFields(new boolean[]{true, false, false, true, false, false, false, true}, new 
+				Class<?>[]{Integer.class,
+				Integer.class, Integer.class});
 			
 			format.configure(new Configuration());
 			format.open(split);
@@ -547,7 +647,7 @@ public class CsvInputFormatTest {
 
 		Object[][] failures = {
 				{"\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING},
-				{"\"unterminated ", 		FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING}
+				{"\"unterminated ", FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING}
 		};
 
 		for (Object[] failure : failures) {
@@ -809,7 +909,8 @@ public class CsvInputFormatTest {
 		@SuppressWarnings("unchecked")
 		TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class);
 		CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo);
-		inputFormat.setFields(new boolean[]{true, false, true, false, true, true}, new Class[]{Integer.class, String.class, Double.class, String.class});
+		inputFormat.setFields(new boolean[]{true, false, true, false, true, true}, new Class[]{Integer.class, String
+			.class, Double.class, String.class});
 
 		inputFormat.configure(new Configuration());
 		FileInputSplit[] splits = inputFormat.createInputSplits(1);