You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/22 09:13:38 UTC

flink git commit: [FLINK-4248] [core] [table] CsvTableSource does not support reading SqlTimeTypeInfo types

Repository: flink
Updated Branches:
  refs/heads/master b51605686 -> 3507d59f9


[FLINK-4248] [core] [table] CsvTableSource does not support reading SqlTimeTypeInfo types

This closes #2303.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3507d59f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3507d59f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3507d59f

Branch: refs/heads/master
Commit: 3507d59f969485dd735487e6bf3eb893b2e3d8ed
Parents: b516056
Author: twalthr <tw...@apache.org>
Authored: Wed Jul 27 14:51:07 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Thu Sep 22 11:09:42 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/types/parser/BigDecParser.java |  31 ++----
 .../apache/flink/types/parser/BigIntParser.java |  44 +++-----
 .../apache/flink/types/parser/DoubleParser.java |  44 +++-----
 .../flink/types/parser/DoubleValueParser.java   |  27 ++---
 .../apache/flink/types/parser/FieldParser.java  |  48 +++++++++
 .../apache/flink/types/parser/FloatParser.java  |  48 +++------
 .../flink/types/parser/FloatValueParser.java    |  27 ++---
 .../flink/types/parser/SqlDateParser.java       | 108 +++++++++++++++++++
 .../flink/types/parser/SqlTimeParser.java       | 102 ++++++++++++++++++
 .../flink/types/parser/SqlTimestampParser.java  | 108 +++++++++++++++++++
 .../typeutils/base/SqlTimeComparatorTest.java   |   2 +-
 .../typeutils/base/SqlTimeSerializerTest.java   |   2 +-
 .../base/SqlTimestampComparatorTest.java        |   6 +-
 .../base/SqlTimestampSerializerTest.java        |   6 +-
 .../flink/types/parser/SqlDateParserTest.java   |  64 +++++++++++
 .../flink/types/parser/SqlTimeParserTest.java   |  63 +++++++++++
 .../types/parser/SqlTimestampParserTest.java    |  69 ++++++++++++
 .../runtime/io/RowCsvInputFormatTest.scala      |  42 +++++++-
 18 files changed, 675 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java
index 46a07fa..9c9f57f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java
@@ -35,42 +35,27 @@ public class BigDecParser extends FieldParser<BigDecimal> {
 
 	@Override
 	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, BigDecimal reusable) {
-		int i = startPos;
-
-		final int delimLimit = limit - delimiter.length + 1;
-
-		while (i < limit) {
-			if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
-				if (i == startPos) {
-					setErrorState(ParseErrorState.EMPTY_COLUMN);
-					return -1;
-				}
-				break;
-			}
-			i++;
-		}
-
-		if (i > startPos &&
-				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(i - 1)]))) {
-			setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+		final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
+		if (endPos < 0) {
 			return -1;
 		}
 
 		try {
-			final int length = i - startPos;
+			final int length = endPos - startPos;
 			if (reuse == null || reuse.length < length) {
 				reuse = new char[length];
 			}
 			for (int j = 0; j < length; j++) {
 				final byte b = bytes[startPos + j];
 				if ((b < '0' || b > '9') && b != '-' && b != '+' && b != '.' && b != 'E' && b != 'e') {
-					throw new NumberFormatException();
+					setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+					return -1;
 				}
 				reuse[j] = (char) bytes[startPos + j];
 			}
 
 			this.result = new BigDecimal(reuse, 0, length);
-			return (i == limit) ? limit : i + delimiter.length;
+			return (endPos == limit) ? limit : endPos + delimiter.length;
 		} catch (NumberFormatException e) {
 			setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
 			return -1;
@@ -96,7 +81,7 @@ public class BigDecParser extends FieldParser<BigDecimal> {
 	 * @param startPos The offset to start the parsing.
 	 * @param length   The length of the byte sequence (counting from the offset).
 	 * @return The parsed value.
-	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text 
+	 * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
 	 * represents not a correct number.
 	 */
 	public static final BigDecimal parseField(byte[] bytes, int startPos, int length) {
@@ -113,7 +98,7 @@ public class BigDecParser extends FieldParser<BigDecimal> {
 	 * @param length    The length of the byte sequence (counting from the offset).
 	 * @param delimiter The delimiter that terminates the field.
 	 * @return The parsed value.
-	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text 
+	 * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
 	 * represents not a correct number.
 	 */
 	public static final BigDecimal parseField(byte[] bytes, int startPos, int length, char delimiter) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
index 13361c1..11e459a 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
@@ -34,31 +34,21 @@ public class BigIntParser extends FieldParser<BigInteger> {
 
 	@Override
 	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, BigInteger reusable) {
-		int i = startPos;
-
-		final int delimLimit = limit - delimiter.length + 1;
-
-		while (i < limit) {
-			if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
-				if (i == startPos) {
-					setErrorState(ParseErrorState.EMPTY_COLUMN);
-					return -1;
-				}
-				break;
-			}
-			i++;
+		final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
+		if (endPos < 0) {
+			return -1;
 		}
 
-		if (i > startPos &&
-				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(i - 1)]))) {
+		if (endPos > startPos &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(endPos - 1)]))) {
 			setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
 			return -1;
 		}
 
-		String str = new String(bytes, startPos, i - startPos);
+		String str = new String(bytes, startPos, endPos - startPos);
 		try {
 			this.result = new BigInteger(str);
-			return (i == limit) ? limit : i + delimiter.length;
+			return (endPos == limit) ? limit : endPos + delimiter.length;
 		} catch (NumberFormatException e) {
 			setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
 			return -1;
@@ -84,7 +74,7 @@ public class BigIntParser extends FieldParser<BigInteger> {
 	 * @param startPos The offset to start the parsing.
 	 * @param length   The length of the byte sequence (counting from the offset).
 	 * @return The parsed value.
-	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text 
+	 * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
 	 * represents not a correct number.
 	 */
 	public static final BigInteger parseField(byte[] bytes, int startPos, int length) {
@@ -101,26 +91,18 @@ public class BigIntParser extends FieldParser<BigInteger> {
 	 * @param length    The length of the byte sequence (counting from the offset).
 	 * @param delimiter The delimiter that terminates the field.
 	 * @return The parsed value.
-	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text 
+	 * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
 	 * represents not a correct number.
 	 */
 	public static final BigInteger parseField(byte[] bytes, int startPos, int length, char delimiter) {
-		if (length <= 0) {
-			throw new NumberFormatException("Invalid input: Empty string");
-		}
-		int i = 0;
-		final byte delByte = (byte) delimiter;
-
-		while (i < length && bytes[startPos + i] != delByte) {
-			i++;
-		}
+		final int limitedLen = nextStringLength(bytes, startPos, length, delimiter);
 
-		if (i > 0 &&
-				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + i - 1]))) {
+		if (limitedLen > 0 &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + limitedLen - 1]))) {
 			throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
 		}
 
-		String str = new String(bytes, startPos, i);
+		final String str = new String(bytes, startPos, limitedLen);
 		return new BigInteger(str);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
index 8af496d..2474adf 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
@@ -33,31 +33,21 @@ public class DoubleParser extends FieldParser<Double> {
 
 	@Override
 	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Double reusable) {
-		int i = startPos;
-
-		final int delimLimit = limit - delimiter.length + 1;
-
-		while (i < limit) {
-			if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
-				if (i == startPos) {
-					setErrorState(ParseErrorState.EMPTY_COLUMN);
-					return -1;
-				}
-				break;
-			}
-			i++;
+		final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
+		if (endPos < 0) {
+			return -1;
 		}
 
-		if (i > startPos &&
-				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(i - 1)]))) {
+		if (endPos > startPos &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(endPos - 1)]))) {
 			setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
 			return -1;
 		}
 
-		String str = new String(bytes, startPos, i - startPos);
+		String str = new String(bytes, startPos, endPos - startPos);
 		try {
 			this.result = Double.parseDouble(str);
-			return (i == limit) ? limit : i + delimiter.length;
+			return (endPos == limit) ? limit : endPos + delimiter.length;
 		} catch (NumberFormatException e) {
 			setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
 			return -1;
@@ -83,7 +73,7 @@ public class DoubleParser extends FieldParser<Double> {
 	 * @param startPos The offset to start the parsing.
 	 * @param length   The length of the byte sequence (counting from the offset).
 	 * @return The parsed value.
-	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text 
+	 * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
 	 * represents not a correct number.
 	 */
 	public static final double parseField(byte[] bytes, int startPos, int length) {
@@ -100,26 +90,18 @@ public class DoubleParser extends FieldParser<Double> {
 	 * @param length    The length of the byte sequence (counting from the offset).
 	 * @param delimiter The delimiter that terminates the field.
 	 * @return The parsed value.
-	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text 
+	 * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
 	 * represents not a correct number.
 	 */
 	public static final double parseField(byte[] bytes, int startPos, int length, char delimiter) {
-		if (length <= 0) {
-			throw new NumberFormatException("Invalid input: Empty string");
-		}
-		int i = 0;
-		final byte delByte = (byte) delimiter;
-
-		while (i < length && bytes[startPos + i] != delByte) {
-			i++;
-		}
+		final int limitedLen = nextStringLength(bytes, startPos, length, delimiter);
 
-		if (i > 0 &&
-				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + i - 1]))) {
+		if (limitedLen > 0 &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + limitedLen - 1]))) {
 			throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
 		}
 
-		String str = new String(bytes, startPos, i);
+		final String str = new String(bytes, startPos, limitedLen);
 		return Double.parseDouble(str);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
index 5c657be..10b43c3 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
@@ -32,34 +32,23 @@ public class DoubleValueParser extends FieldParser<DoubleValue> {
 	
 	@Override
 	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, DoubleValue reusable) {
-		
-		int i = startPos;
-
-		final int delimLimit = limit - delimiter.length + 1;
-
-		while (i < limit) {
-			if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
-				if (i == startPos) {
-					setErrorState(ParseErrorState.EMPTY_COLUMN);
-					return -1;
-				}
-				break;
-			}
-			i++;
+		final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
+		if (endPos < 0) {
+			return -1;
 		}
-		
-		if (i > startPos &&
-				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[i - 1]))) {
+
+		if (endPos > startPos &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(endPos - 1)]))) {
 			setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
 			return -1;
 		}
 
-		String str = new String(bytes, startPos, i - startPos);
+		String str = new String(bytes, startPos, endPos - startPos);
 		try {
 			double value = Double.parseDouble(str);
 			reusable.setValue(value);
 			this.result = reusable;
-			return (i == limit) ? limit : i + delimiter.length;
+			return (endPos == limit) ? limit : endPos + delimiter.length;
 		}
 		catch (NumberFormatException e) {
 			setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index a1b9c5f..200d239 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -174,6 +174,49 @@ public abstract class FieldParser<T> {
 	public ParseErrorState getErrorState() {
 		return this.errorState;
 	}
+
+	/**
+	 * Returns the end position of a string. Sets the error state if the column is empty.
+	 *
+	 * @return the end position of the string or -1 if an error occurred
+	 */
+	protected final int nextStringEndPos(byte[] bytes, int startPos, int limit, byte[] delimiter) {
+		int endPos = startPos;
+
+		final int delimLimit = limit - delimiter.length + 1;
+
+		while (endPos < limit) {
+			if (endPos < delimLimit && delimiterNext(bytes, endPos, delimiter)) {
+				if (endPos == startPos) {
+					setErrorState(ParseErrorState.EMPTY_COLUMN);
+					return -1;
+				}
+				break;
+			}
+			endPos++;
+		}
+
+		return endPos;
+	}
+
+	/**
+	 * Returns the length of a string. Throws an exception if the column is empty.
+	 *
+	 * @return the length of the string
+	 */
+	protected static final int nextStringLength(byte[] bytes, int startPos, int length, char delimiter) {
+		if (length <= 0) {
+			throw new IllegalArgumentException("Invalid input: Empty string");
+		}
+		int limitedLength = 0;
+		final byte delByte = (byte) delimiter;
+
+		while (limitedLength < length && bytes[startPos + limitedLength] != delByte) {
+			limitedLength++;
+		}
+
+		return limitedLength;
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	//  Mapping from types to parsers
@@ -222,5 +265,10 @@ public abstract class FieldParser<T> {
 		PARSERS.put(FloatValue.class, FloatValueParser.class);
 		PARSERS.put(DoubleValue.class, DoubleValueParser.class);
 		PARSERS.put(BooleanValue.class, BooleanValueParser.class);
+
+		// SQL date/time types
+		PARSERS.put(java.sql.Time.class, SqlTimeParser.class);
+		PARSERS.put(java.sql.Date.class, SqlDateParser.class);
+		PARSERS.put(java.sql.Timestamp.class, SqlTimestampParser.class);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
index 3304f24..e76484e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
@@ -30,34 +30,22 @@ public class FloatParser extends FieldParser<Float> {
 	private float result;
 	
 	@Override
-	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Float 
-		reusable) {
-
-		int i = startPos;
-
-		final int delimLimit = limit - delimiter.length + 1;
-
-		while (i < limit) {
-			if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
-				if (i == startPos) {
-					setErrorState(ParseErrorState.EMPTY_COLUMN);
-					return -1;
-				}
-				break;
-			}
-			i++;
+	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Float reusable) {
+		final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
+		if (endPos < 0) {
+			return -1;
 		}
 
-		if (i > startPos &&
-				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[i - 1]))) {
+		if (endPos > startPos &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[endPos - 1]))) {
 			setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
 			return -1;
 		}
 
-		String str = new String(bytes, startPos, i - startPos);
+		String str = new String(bytes, startPos, endPos - startPos);
 		try {
 			this.result = Float.parseFloat(str);
-			return (i == limit) ? limit : i + delimiter.length;
+			return (endPos == limit) ? limit : endPos + delimiter.length;
 		} catch (NumberFormatException e) {
 			setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
 			return -1;
@@ -83,7 +71,7 @@ public class FloatParser extends FieldParser<Float> {
 	 * @param startPos The offset to start the parsing.
 	 * @param length   The length of the byte sequence (counting from the offset).
 	 * @return The parsed value.
-	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text 
+	 * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
 	 * represents not a correct number.
 	 */
 	public static final float parseField(byte[] bytes, int startPos, int length) {
@@ -100,26 +88,18 @@ public class FloatParser extends FieldParser<Float> {
 	 * @param length    The length of the byte sequence (counting from the offset).
 	 * @param delimiter The delimiter that terminates the field.
 	 * @return The parsed value.
-	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text 
+	 * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
 	 * represents not a correct number.
 	 */
 	public static final float parseField(byte[] bytes, int startPos, int length, char delimiter) {
-		if (length <= 0) {
-			throw new NumberFormatException("Invalid input: Empty string");
-		}
-		int i = 0;
-		final byte delByte = (byte) delimiter;
-
-		while (i < length && bytes[startPos + i] != delByte) {
-			i++;
-		}
+		final int limitedLen = nextStringLength(bytes, startPos, length, delimiter);
 
-		if (i > 0 &&
-				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + i - 1]))) {
+		if (limitedLen > 0 &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + limitedLen - 1]))) {
 			throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
 		}
 
-		String str = new String(bytes, startPos, i);
+		final String str = new String(bytes, startPos, limitedLen);
 		return Float.parseFloat(str);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
index 26ee47b..a834f22 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
@@ -32,34 +32,23 @@ public class FloatValueParser extends FieldParser<FloatValue> {
 	
 	@Override
 	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, FloatValue reusable) {
-		
-		int i = startPos;
-
-		final int delimLimit = limit - delimiter.length + 1;
-
-		while (i < limit) {
-			if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
-				if (i == startPos) {
-					setErrorState(ParseErrorState.EMPTY_COLUMN);
-					return -1;
-				}
-				break;
-			}
-			i++;
+		final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
+		if (endPos < 0) {
+			return -1;
 		}
-		
-		if (i > startPos &&
-				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[i - 1]))) {
+
+		if (endPos > startPos &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[endPos - 1]))) {
 			setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
 			return -1;
 		}
 
-		String str = new String(bytes, startPos, i - startPos);
+		String str = new String(bytes, startPos, endPos - startPos);
 		try {
 			float value = Float.parseFloat(str);
 			reusable.setValue(value);
 			this.result = reusable;
-			return (i == limit) ? limit : i + delimiter.length;
+			return (endPos == limit) ? limit : endPos + delimiter.length;
 		}
 		catch (NumberFormatException e) {
 			setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
new file mode 100644
index 0000000..859dcf8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+import java.sql.Date;
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Parses a text field into a {@link java.sql.Date}.
+ */
+@PublicEvolving
+public class SqlDateParser extends FieldParser<Date> {
+
+	private static final Date DATE_INSTANCE = new Date(0L);
+
+	private Date result;
+
+	@Override
+	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Date reusable) {
+		final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
+		if (endPos < 0) {
+			return -1;
+		}
+
+		if (endPos > startPos &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(endPos - 1)]))) {
+			setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+			return -1;
+		}
+
+		String str = new String(bytes, startPos, endPos - startPos);
+		try {
+			this.result = Date.valueOf(str);
+			return (endPos == limit) ? limit : endPos + delimiter.length;
+		} catch (IllegalArgumentException e) {
+			setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
+			return -1;
+		}
+	}
+
+	@Override
+	public Date createValue() {
+		return DATE_INSTANCE;
+	}
+
+	@Override
+	public Date getLastResult() {
+		return this.result;
+	}
+
+	/**
+	 * Static utility to parse a field of type Date from a byte sequence that represents text
+	 * characters
+	 * (such as when read from a file stream).
+	 *
+	 * @param bytes    The bytes containing the text data that should be parsed.
+	 * @param startPos The offset to start the parsing.
+	 * @param length   The length of the byte sequence (counting from the offset).
+	 * @return The parsed value.
+	 * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
+	 * represents not a correct number.
+	 */
+	public static final Date parseField(byte[] bytes, int startPos, int length) {
+		return parseField(bytes, startPos, length, (char) 0xffff);
+	}
+
+	/**
+	 * Static utility to parse a field of type Date from a byte sequence that represents text
+	 * characters
+	 * (such as when read from a file stream).
+	 *
+	 * @param bytes     The bytes containing the text data that should be parsed.
+	 * @param startPos  The offset to start the parsing.
+	 * @param length    The length of the byte sequence (counting from the offset).
+	 * @param delimiter The delimiter that terminates the field.
+	 * @return The parsed value.
+	 * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
+	 * represents not a correct number.
+	 */
+	public static final Date parseField(byte[] bytes, int startPos, int length, char delimiter) {
+		final int limitedLen = nextStringLength(bytes, startPos, length, delimiter);
+
+		if (limitedLen > 0 &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + limitedLen - 1]))) {
+			throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
+		}
+
+		final String str = new String(bytes, startPos, limitedLen);
+		return Date.valueOf(str);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
new file mode 100644
index 0000000..fbddadc
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+import java.sql.Time;
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Parses a text field into a {@link Time}.
+ */
+@PublicEvolving
+public class SqlTimeParser extends FieldParser<Time> {
+
+	private static final Time TIME_INSTANCE = new Time(0L);
+
+	private Time result;
+
+	@Override
+	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Time reusable) {
+		final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
+		if (endPos < 0) {
+			return -1;
+		}
+
+		String str = new String(bytes, startPos, endPos - startPos);
+		try {
+			this.result = Time.valueOf(str);
+			return (endPos == limit) ? limit : endPos + delimiter.length;
+		} catch (IllegalArgumentException e) {
+			setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
+			return -1;
+		}
+	}
+
+	@Override
+	public Time createValue() {
+		return TIME_INSTANCE;
+	}
+
+	@Override
+	public Time getLastResult() {
+		return this.result;
+	}
+
+	/**
+	 * Static utility to parse a field of type Time from a byte sequence that represents text
+	 * characters
+	 * (such as when read from a file stream).
+	 *
+	 * @param bytes    The bytes containing the text data that should be parsed.
+	 * @param startPos The offset to start the parsing.
+	 * @param length   The length of the byte sequence (counting from the offset).
+	 * @return The parsed value.
+	 * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
+	 * represents not a correct number.
+	 */
+	public static final Time parseField(byte[] bytes, int startPos, int length) {
+		return parseField(bytes, startPos, length, (char) 0xffff);
+	}
+
+	/**
+	 * Static utility to parse a field of type Time from a byte sequence that represents text
+	 * characters
+	 * (such as when read from a file stream).
+	 *
+	 * @param bytes     The bytes containing the text data that should be parsed.
+	 * @param startPos  The offset to start the parsing.
+	 * @param length    The length of the byte sequence (counting from the offset).
+	 * @param delimiter The delimiter that terminates the field.
+	 * @return The parsed value.
+	 * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
+	 * represents not a correct number.
+	 */
+	public static final Time parseField(byte[] bytes, int startPos, int length, char delimiter) {
+		final int limitedLen = nextStringLength(bytes, startPos, length, delimiter);
+
+		if (limitedLen > 0 &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + limitedLen - 1]))) {
+			throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
+		}
+
+		final String str = new String(bytes, startPos, limitedLen);
+		return Time.valueOf(str);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
new file mode 100644
index 0000000..0bcb602
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+import java.sql.Timestamp;
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Parses a text field into a {@link Timestamp}.
+ */
+@PublicEvolving
+public class SqlTimestampParser extends FieldParser<Timestamp> {
+
+	private static final Timestamp TIMESTAMP_INSTANCE = new Timestamp(0L);
+
+	private Timestamp result;
+
+	@Override
+	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Timestamp reusable) {
+		final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
+		if (endPos < 0) {
+			return -1;
+		}
+
+		if (endPos > startPos &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(endPos - 1)]))) {
+			setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+			return -1;
+		}
+
+		String str = new String(bytes, startPos, endPos - startPos);
+		try {
+			this.result = Timestamp.valueOf(str);
+			return (endPos == limit) ? limit : endPos + delimiter.length;
+		} catch (IllegalArgumentException e) {
+			setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
+			return -1;
+		}
+	}
+
+	@Override
+	public Timestamp createValue() {
+		return TIMESTAMP_INSTANCE;
+	}
+
+	@Override
+	public Timestamp getLastResult() {
+		return this.result;
+	}
+
+	/**
+	 * Static utility to parse a field of type Timestamp from a byte sequence that represents text
+	 * characters
+	 * (such as when read from a file stream).
+	 *
+	 * @param bytes    The bytes containing the text data that should be parsed.
+	 * @param startPos The offset to start the parsing.
+	 * @param length   The length of the byte sequence (counting from the offset).
+	 * @return The parsed value.
+	 * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
+	 * represents not a correct number.
+	 */
+	public static final Timestamp parseField(byte[] bytes, int startPos, int length) {
+		return parseField(bytes, startPos, length, (char) 0xffff);
+	}
+
+	/**
+	 * Static utility to parse a field of type Timestamp from a byte sequence that represents text
+	 * characters
+	 * (such as when read from a file stream).
+	 *
+	 * @param bytes     The bytes containing the text data that should be parsed.
+	 * @param startPos  The offset to start the parsing.
+	 * @param length    The length of the byte sequence (counting from the offset).
+	 * @param delimiter The delimiter that terminates the field.
+	 * @return The parsed value.
+	 * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
+	 * represents not a correct number.
+	 */
+	public static final Timestamp parseField(byte[] bytes, int startPos, int length, char delimiter) {
+		final int limitedLen = nextStringLength(bytes, startPos, length, delimiter);
+
+		if (limitedLen > 0 &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + limitedLen - 1]))) {
+			throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
+		}
+
+		final String str = new String(bytes, startPos, limitedLen);
+		return Timestamp.valueOf(str);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java
index 2b5cfdf..8fb3319 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java
@@ -40,7 +40,7 @@ public class SqlTimeComparatorTest extends ComparatorTestBase<Time> {
 	protected Time[] getSortedTestData() {
 		return new Time[] {
 			Time.valueOf("00:00:00"),
-			Time.valueOf("02:42:85"),
+			Time.valueOf("02:42:25"),
 			Time.valueOf("14:15:59"),
 			Time.valueOf("18:00:45")
 		};

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java
index 4d16050..bfac789 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java
@@ -47,7 +47,7 @@ public class SqlTimeSerializerTest extends SerializerTestBase<Time> {
 		return new Time[] {
 			new Time(0L),
 			Time.valueOf("00:00:00"),
-			Time.valueOf("02:42:85"),
+			Time.valueOf("02:42:25"),
 			Time.valueOf("14:15:59"),
 			Time.valueOf("18:00:45")
 		};

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java
index 0b8d294..e182d0a 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java
@@ -40,9 +40,9 @@ public class SqlTimestampComparatorTest extends ComparatorTestBase<Timestamp> {
 	protected Timestamp[] getSortedTestData() {
 		return new Timestamp[] {
 			Timestamp.valueOf("1970-01-01 00:00:00.000"),
-			Timestamp.valueOf("1990-10-14 02:42:85.123"),
-			Timestamp.valueOf("1990-10-14 02:42:85.123000001"),
-			Timestamp.valueOf("1990-10-14 02:42:85.123000002"),
+			Timestamp.valueOf("1990-10-14 02:42:25.123"),
+			Timestamp.valueOf("1990-10-14 02:42:25.123000001"),
+			Timestamp.valueOf("1990-10-14 02:42:25.123000002"),
 			Timestamp.valueOf("2013-08-12 14:15:59.478"),
 			Timestamp.valueOf("2013-08-12 14:15:59.479"),
 			Timestamp.valueOf("2040-05-12 18:00:45.999")

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java
index 70172d5..e825eaa 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java
@@ -47,9 +47,9 @@ public class SqlTimestampSerializerTest extends SerializerTestBase<Timestamp> {
 		return new Timestamp[] {
 			new Timestamp(0L),
 			Timestamp.valueOf("1970-01-01 00:00:00.000"),
-			Timestamp.valueOf("1990-10-14 02:42:85.123"),
-			Timestamp.valueOf("1990-10-14 02:42:85.123000001"),
-			Timestamp.valueOf("1990-10-14 02:42:85.123000002"),
+			Timestamp.valueOf("1990-10-14 02:42:25.123"),
+			Timestamp.valueOf("1990-10-14 02:42:25.123000001"),
+			Timestamp.valueOf("1990-10-14 02:42:25.123000002"),
 			Timestamp.valueOf("2013-08-12 14:15:59.478"),
 			Timestamp.valueOf("2013-08-12 14:15:59.479"),
 			Timestamp.valueOf("2040-05-12 18:00:45.999")

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/types/parser/SqlDateParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/SqlDateParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/SqlDateParserTest.java
new file mode 100644
index 0000000..25015cd
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/SqlDateParserTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+
+import java.sql.Date;
+
+public class SqlDateParserTest extends ParserTestBase<Date> {
+
+	@Override
+	public String[] getValidTestValues() {
+		return new String[] {
+			"1970-01-01", "1990-10-14", "2013-08-12", "2040-05-12", "2040-5-12", "1970-1-1",
+		};
+	}
+
+	@Override
+	public Date[] getValidTestResults() {
+		return new Date[] {
+			Date.valueOf("1970-01-01"), Date.valueOf("1990-10-14"), Date.valueOf("2013-08-12"),
+			Date.valueOf("2040-05-12"), Date.valueOf("2040-05-12"), Date.valueOf("1970-01-01")
+		};
+	}
+
+	@Override
+	public String[] getInvalidTestValues() {
+		return new String[] {
+			" 2013-08-12", "2013-08-12 ", "2013-08--12", "13-08-12", "2013/08/12", " ", "\t",
+			"2013-XX-XX", "2000-02-35"
+		};
+	}
+
+	@Override
+	public boolean allowsEmptyField() {
+		return false;
+	}
+
+	@Override
+	public FieldParser<Date> getParser() {
+		return new SqlDateParser();
+	}
+
+	@Override
+	public Class<Date> getTypeClass() {
+		return Date.class;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimeParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimeParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimeParserTest.java
new file mode 100644
index 0000000..06ebd3d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimeParserTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+
+import java.sql.Time;
+
+public class SqlTimeParserTest extends ParserTestBase<Time> {
+
+	@Override
+	public String[] getValidTestValues() {
+		return new String[] {
+			"00:00:00", "02:42:25", "14:15:51", "18:00:45", "23:59:58", "0:0:0"
+		};
+	}
+
+	@Override
+	public Time[] getValidTestResults() {
+		return new Time[] {
+			Time.valueOf("00:00:00"), Time.valueOf("02:42:25"), Time.valueOf("14:15:51"),
+			Time.valueOf("18:00:45"), Time.valueOf("23:59:58"), Time.valueOf("0:0:0")
+		};
+	}
+
+	@Override
+	public String[] getInvalidTestValues() {
+		return new String[] {
+			" 00:00:00", "00:00:00 ", "00:00::00", "00x00:00", "2013/08/12", " ", "\t"
+		};
+	}
+
+	@Override
+	public boolean allowsEmptyField() {
+		return false;
+	}
+
+	@Override
+	public FieldParser<Time> getParser() {
+		return new SqlTimeParser();
+	}
+
+	@Override
+	public Class<Time> getTypeClass() {
+		return Time.class;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimestampParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimestampParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimestampParserTest.java
new file mode 100644
index 0000000..0527606
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimestampParserTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+
+import java.sql.Timestamp;
+
+public class SqlTimestampParserTest extends ParserTestBase<Timestamp> {
+
+	@Override
+	public String[] getValidTestValues() {
+		return new String[] {
+			"1970-01-01 00:00:00.000", "1990-10-14 02:42:25", "1990-10-14 02:42:25.123", "1990-10-14 02:42:25.123000001",
+			"1990-10-14 02:42:25.123000002", "2013-08-12 14:15:59.478", "2013-08-12 14:15:59.47",
+			"0000-01-01 00:00:00.000",
+		};
+	}
+
+	@Override
+	public Timestamp[] getValidTestResults() {
+		return new Timestamp[] {
+			Timestamp.valueOf("1970-01-01 00:00:00.000"), Timestamp.valueOf("1990-10-14 02:42:25"), Timestamp.valueOf("1990-10-14 02:42:25.123"),
+			Timestamp.valueOf("1990-10-14 02:42:25.123000001"), Timestamp.valueOf("1990-10-14 02:42:25.123000002"),
+			Timestamp.valueOf("2013-08-12 14:15:59.478"), Timestamp.valueOf("2013-08-12 14:15:59.47"),
+			Timestamp.valueOf("0000-01-01 00:00:00.000")
+		};
+	}
+
+	@Override
+	public String[] getInvalidTestValues() {
+		return new String[] {
+			" 2013-08-12 14:15:59.479", "2013-08-12 14:15:59.479 ", "1970-01-01 00:00::00",
+			"00x00:00", "2013/08/12", "0000-01-01 00:00:00.f00", "2013-08-12 14:15:59.4788888888888888",
+			" ", "\t"
+		};
+	}
+
+	@Override
+	public boolean allowsEmptyField() {
+		return false;
+	}
+
+	@Override
+	public FieldParser<Timestamp> getParser() {
+		return new SqlTimestampParser();
+	}
+
+	@Override
+	public Class<Timestamp> getTypeClass() {
+		return Timestamp.class;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
index db01b69..d176b79 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
@@ -20,9 +20,10 @@ package org.apache.flink.api.table.runtime.io
 
 import java.io.{File, FileOutputStream, OutputStreamWriter}
 import java.nio.charset.StandardCharsets
+import java.sql.{Date, Time, Timestamp}
 
 import org.apache.flink.api.common.io.ParseException
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
 import org.apache.flink.api.table.Row
 import org.apache.flink.api.table.runtime.io.RowCsvInputFormatTest.{PATH, createTempFile, testRemovingTrailingCR}
 import org.apache.flink.api.table.typeutils.RowTypeInfo
@@ -786,6 +787,45 @@ class RowCsvInputFormatTest {
     assertEquals("\\\"Hello\\\" World", record.productElement(0))
     assertEquals("We are\\\" young", record.productElement(1))
   }
+
+  @Test
+  def testSqlTimeFields() {
+    val fileContent = "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5\n" +
+      "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5.3\n"
+
+    val split = createTempFile(fileContent)
+
+    val typeInfo = new RowTypeInfo(Seq(
+      SqlTimeTypeInfo.DATE,
+      SqlTimeTypeInfo.TIME,
+      SqlTimeTypeInfo.TIMESTAMP,
+      SqlTimeTypeInfo.TIMESTAMP))
+
+    val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo)
+    format.setFieldDelimiter("|")
+    format.configure(new Configuration)
+    format.open(split)
+
+    var result = new Row(4)
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals(Date.valueOf("1990-10-14"), result.productElement(0))
+    assertEquals(Time.valueOf("02:42:25"), result.productElement(1))
+    assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), result.productElement(2))
+    assertEquals(Timestamp.valueOf("1990-01-04 02:02:05"), result.productElement(3))
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals(Date.valueOf("1990-10-14"), result.productElement(0))
+    assertEquals(Time.valueOf("02:42:25"), result.productElement(1))
+    assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), result.productElement(2))
+    assertEquals(Timestamp.valueOf("1990-01-04 02:02:05.3"), result.productElement(3))
+
+    result = format.nextRecord(result)
+    assertNull(result)
+    assertTrue(format.reachedEnd)
+  }
 }
 
 object RowCsvInputFormatTest {