You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/22 09:13:38 UTC
flink git commit: [FLINK-4248] [core] [table] CsvTableSource does not
support reading SqlTimeTypeInfo types
Repository: flink
Updated Branches:
refs/heads/master b51605686 -> 3507d59f9
[FLINK-4248] [core] [table] CsvTableSource does not support reading SqlTimeTypeInfo types
This closes #2303.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3507d59f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3507d59f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3507d59f
Branch: refs/heads/master
Commit: 3507d59f969485dd735487e6bf3eb893b2e3d8ed
Parents: b516056
Author: twalthr <tw...@apache.org>
Authored: Wed Jul 27 14:51:07 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Thu Sep 22 11:09:42 2016 +0200
----------------------------------------------------------------------
.../apache/flink/types/parser/BigDecParser.java | 31 ++----
.../apache/flink/types/parser/BigIntParser.java | 44 +++-----
.../apache/flink/types/parser/DoubleParser.java | 44 +++-----
.../flink/types/parser/DoubleValueParser.java | 27 ++---
.../apache/flink/types/parser/FieldParser.java | 48 +++++++++
.../apache/flink/types/parser/FloatParser.java | 48 +++------
.../flink/types/parser/FloatValueParser.java | 27 ++---
.../flink/types/parser/SqlDateParser.java | 108 +++++++++++++++++++
.../flink/types/parser/SqlTimeParser.java | 102 ++++++++++++++++++
.../flink/types/parser/SqlTimestampParser.java | 108 +++++++++++++++++++
.../typeutils/base/SqlTimeComparatorTest.java | 2 +-
.../typeutils/base/SqlTimeSerializerTest.java | 2 +-
.../base/SqlTimestampComparatorTest.java | 6 +-
.../base/SqlTimestampSerializerTest.java | 6 +-
.../flink/types/parser/SqlDateParserTest.java | 64 +++++++++++
.../flink/types/parser/SqlTimeParserTest.java | 63 +++++++++++
.../types/parser/SqlTimestampParserTest.java | 69 ++++++++++++
.../runtime/io/RowCsvInputFormatTest.scala | 42 +++++++-
18 files changed, 675 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java
index 46a07fa..9c9f57f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java
@@ -35,42 +35,27 @@ public class BigDecParser extends FieldParser<BigDecimal> {
@Override
public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, BigDecimal reusable) {
- int i = startPos;
-
- final int delimLimit = limit - delimiter.length + 1;
-
- while (i < limit) {
- if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
- if (i == startPos) {
- setErrorState(ParseErrorState.EMPTY_COLUMN);
- return -1;
- }
- break;
- }
- i++;
- }
-
- if (i > startPos &&
- (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(i - 1)]))) {
- setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+ final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
+ if (endPos < 0) {
return -1;
}
try {
- final int length = i - startPos;
+ final int length = endPos - startPos;
if (reuse == null || reuse.length < length) {
reuse = new char[length];
}
for (int j = 0; j < length; j++) {
final byte b = bytes[startPos + j];
if ((b < '0' || b > '9') && b != '-' && b != '+' && b != '.' && b != 'E' && b != 'e') {
- throw new NumberFormatException();
+ setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+ return -1;
}
reuse[j] = (char) bytes[startPos + j];
}
this.result = new BigDecimal(reuse, 0, length);
- return (i == limit) ? limit : i + delimiter.length;
+ return (endPos == limit) ? limit : endPos + delimiter.length;
} catch (NumberFormatException e) {
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
return -1;
@@ -96,7 +81,7 @@ public class BigDecParser extends FieldParser<BigDecimal> {
* @param startPos The offset to start the parsing.
* @param length The length of the byte sequence (counting from the offset).
* @return The parsed value.
- * @throws NumberFormatException Thrown when the value cannot be parsed because the text
+ * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
* represents not a correct number.
*/
public static final BigDecimal parseField(byte[] bytes, int startPos, int length) {
@@ -113,7 +98,7 @@ public class BigDecParser extends FieldParser<BigDecimal> {
* @param length The length of the byte sequence (counting from the offset).
* @param delimiter The delimiter that terminates the field.
* @return The parsed value.
- * @throws NumberFormatException Thrown when the value cannot be parsed because the text
+ * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
* represents not a correct number.
*/
public static final BigDecimal parseField(byte[] bytes, int startPos, int length, char delimiter) {
http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
index 13361c1..11e459a 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
@@ -34,31 +34,21 @@ public class BigIntParser extends FieldParser<BigInteger> {
@Override
public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, BigInteger reusable) {
- int i = startPos;
-
- final int delimLimit = limit - delimiter.length + 1;
-
- while (i < limit) {
- if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
- if (i == startPos) {
- setErrorState(ParseErrorState.EMPTY_COLUMN);
- return -1;
- }
- break;
- }
- i++;
+ final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
+ if (endPos < 0) {
+ return -1;
}
- if (i > startPos &&
- (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(i - 1)]))) {
+ if (endPos > startPos &&
+ (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(endPos - 1)]))) {
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
return -1;
}
- String str = new String(bytes, startPos, i - startPos);
+ String str = new String(bytes, startPos, endPos - startPos);
try {
this.result = new BigInteger(str);
- return (i == limit) ? limit : i + delimiter.length;
+ return (endPos == limit) ? limit : endPos + delimiter.length;
} catch (NumberFormatException e) {
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
return -1;
@@ -84,7 +74,7 @@ public class BigIntParser extends FieldParser<BigInteger> {
* @param startPos The offset to start the parsing.
* @param length The length of the byte sequence (counting from the offset).
* @return The parsed value.
- * @throws NumberFormatException Thrown when the value cannot be parsed because the text
+ * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
* represents not a correct number.
*/
public static final BigInteger parseField(byte[] bytes, int startPos, int length) {
@@ -101,26 +91,18 @@ public class BigIntParser extends FieldParser<BigInteger> {
* @param length The length of the byte sequence (counting from the offset).
* @param delimiter The delimiter that terminates the field.
* @return The parsed value.
- * @throws NumberFormatException Thrown when the value cannot be parsed because the text
+ * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
* represents not a correct number.
*/
public static final BigInteger parseField(byte[] bytes, int startPos, int length, char delimiter) {
- if (length <= 0) {
- throw new NumberFormatException("Invalid input: Empty string");
- }
- int i = 0;
- final byte delByte = (byte) delimiter;
-
- while (i < length && bytes[startPos + i] != delByte) {
- i++;
- }
+ final int limitedLen = nextStringLength(bytes, startPos, length, delimiter);
- if (i > 0 &&
- (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + i - 1]))) {
+ if (limitedLen > 0 &&
+ (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + limitedLen - 1]))) {
throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
}
- String str = new String(bytes, startPos, i);
+ final String str = new String(bytes, startPos, limitedLen);
return new BigInteger(str);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
index 8af496d..2474adf 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
@@ -33,31 +33,21 @@ public class DoubleParser extends FieldParser<Double> {
@Override
public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Double reusable) {
- int i = startPos;
-
- final int delimLimit = limit - delimiter.length + 1;
-
- while (i < limit) {
- if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
- if (i == startPos) {
- setErrorState(ParseErrorState.EMPTY_COLUMN);
- return -1;
- }
- break;
- }
- i++;
+ final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
+ if (endPos < 0) {
+ return -1;
}
- if (i > startPos &&
- (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(i - 1)]))) {
+ if (endPos > startPos &&
+ (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(endPos - 1)]))) {
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
return -1;
}
- String str = new String(bytes, startPos, i - startPos);
+ String str = new String(bytes, startPos, endPos - startPos);
try {
this.result = Double.parseDouble(str);
- return (i == limit) ? limit : i + delimiter.length;
+ return (endPos == limit) ? limit : endPos + delimiter.length;
} catch (NumberFormatException e) {
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
return -1;
@@ -83,7 +73,7 @@ public class DoubleParser extends FieldParser<Double> {
* @param startPos The offset to start the parsing.
* @param length The length of the byte sequence (counting from the offset).
* @return The parsed value.
- * @throws NumberFormatException Thrown when the value cannot be parsed because the text
+ * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
* represents not a correct number.
*/
public static final double parseField(byte[] bytes, int startPos, int length) {
@@ -100,26 +90,18 @@ public class DoubleParser extends FieldParser<Double> {
* @param length The length of the byte sequence (counting from the offset).
* @param delimiter The delimiter that terminates the field.
* @return The parsed value.
- * @throws NumberFormatException Thrown when the value cannot be parsed because the text
+ * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
* represents not a correct number.
*/
public static final double parseField(byte[] bytes, int startPos, int length, char delimiter) {
- if (length <= 0) {
- throw new NumberFormatException("Invalid input: Empty string");
- }
- int i = 0;
- final byte delByte = (byte) delimiter;
-
- while (i < length && bytes[startPos + i] != delByte) {
- i++;
- }
+ final int limitedLen = nextStringLength(bytes, startPos, length, delimiter);
- if (i > 0 &&
- (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + i - 1]))) {
+ if (limitedLen > 0 &&
+ (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + limitedLen - 1]))) {
throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
}
- String str = new String(bytes, startPos, i);
+ final String str = new String(bytes, startPos, limitedLen);
return Double.parseDouble(str);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
index 5c657be..10b43c3 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
@@ -32,34 +32,23 @@ public class DoubleValueParser extends FieldParser<DoubleValue> {
@Override
public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, DoubleValue reusable) {
-
- int i = startPos;
-
- final int delimLimit = limit - delimiter.length + 1;
-
- while (i < limit) {
- if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
- if (i == startPos) {
- setErrorState(ParseErrorState.EMPTY_COLUMN);
- return -1;
- }
- break;
- }
- i++;
+ final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
+ if (endPos < 0) {
+ return -1;
}
-
- if (i > startPos &&
- (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[i - 1]))) {
+
+ if (endPos > startPos &&
+ (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(endPos - 1)]))) {
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
return -1;
}
- String str = new String(bytes, startPos, i - startPos);
+ String str = new String(bytes, startPos, endPos - startPos);
try {
double value = Double.parseDouble(str);
reusable.setValue(value);
this.result = reusable;
- return (i == limit) ? limit : i + delimiter.length;
+ return (endPos == limit) ? limit : endPos + delimiter.length;
}
catch (NumberFormatException e) {
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index a1b9c5f..200d239 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -174,6 +174,49 @@ public abstract class FieldParser<T> {
public ParseErrorState getErrorState() {
return this.errorState;
}
+
+ /**
+ * Returns the end position of a string. Sets the error state if the column is empty.
+ *
+ * @return the end position of the string or -1 if an error occurred
+ */
+ protected final int nextStringEndPos(byte[] bytes, int startPos, int limit, byte[] delimiter) {
+ int endPos = startPos;
+
+ final int delimLimit = limit - delimiter.length + 1;
+
+ while (endPos < limit) {
+ if (endPos < delimLimit && delimiterNext(bytes, endPos, delimiter)) {
+ if (endPos == startPos) {
+ setErrorState(ParseErrorState.EMPTY_COLUMN);
+ return -1;
+ }
+ break;
+ }
+ endPos++;
+ }
+
+ return endPos;
+ }
+
+ /**
+ * Returns the length of a string. Throws an exception if the column is empty.
+ *
+ * @return the length of the string
+ */
+ protected static final int nextStringLength(byte[] bytes, int startPos, int length, char delimiter) {
+ if (length <= 0) {
+ throw new IllegalArgumentException("Invalid input: Empty string");
+ }
+ int limitedLength = 0;
+ final byte delByte = (byte) delimiter;
+
+ while (limitedLength < length && bytes[startPos + limitedLength] != delByte) {
+ limitedLength++;
+ }
+
+ return limitedLength;
+ }
// --------------------------------------------------------------------------------------------
// Mapping from types to parsers
@@ -222,5 +265,10 @@ public abstract class FieldParser<T> {
PARSERS.put(FloatValue.class, FloatValueParser.class);
PARSERS.put(DoubleValue.class, DoubleValueParser.class);
PARSERS.put(BooleanValue.class, BooleanValueParser.class);
+
+ // SQL date/time types
+ PARSERS.put(java.sql.Time.class, SqlTimeParser.class);
+ PARSERS.put(java.sql.Date.class, SqlDateParser.class);
+ PARSERS.put(java.sql.Timestamp.class, SqlTimestampParser.class);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
index 3304f24..e76484e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
@@ -30,34 +30,22 @@ public class FloatParser extends FieldParser<Float> {
private float result;
@Override
- public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Float
- reusable) {
-
- int i = startPos;
-
- final int delimLimit = limit - delimiter.length + 1;
-
- while (i < limit) {
- if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
- if (i == startPos) {
- setErrorState(ParseErrorState.EMPTY_COLUMN);
- return -1;
- }
- break;
- }
- i++;
+ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Float reusable) {
+ final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
+ if (endPos < 0) {
+ return -1;
}
- if (i > startPos &&
- (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[i - 1]))) {
+ if (endPos > startPos &&
+ (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[endPos - 1]))) {
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
return -1;
}
- String str = new String(bytes, startPos, i - startPos);
+ String str = new String(bytes, startPos, endPos - startPos);
try {
this.result = Float.parseFloat(str);
- return (i == limit) ? limit : i + delimiter.length;
+ return (endPos == limit) ? limit : endPos + delimiter.length;
} catch (NumberFormatException e) {
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
return -1;
@@ -83,7 +71,7 @@ public class FloatParser extends FieldParser<Float> {
* @param startPos The offset to start the parsing.
* @param length The length of the byte sequence (counting from the offset).
* @return The parsed value.
- * @throws NumberFormatException Thrown when the value cannot be parsed because the text
+ * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
* represents not a correct number.
*/
public static final float parseField(byte[] bytes, int startPos, int length) {
@@ -100,26 +88,18 @@ public class FloatParser extends FieldParser<Float> {
* @param length The length of the byte sequence (counting from the offset).
* @param delimiter The delimiter that terminates the field.
* @return The parsed value.
- * @throws NumberFormatException Thrown when the value cannot be parsed because the text
+ * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
* represents not a correct number.
*/
public static final float parseField(byte[] bytes, int startPos, int length, char delimiter) {
- if (length <= 0) {
- throw new NumberFormatException("Invalid input: Empty string");
- }
- int i = 0;
- final byte delByte = (byte) delimiter;
-
- while (i < length && bytes[startPos + i] != delByte) {
- i++;
- }
+ final int limitedLen = nextStringLength(bytes, startPos, length, delimiter);
- if (i > 0 &&
- (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + i - 1]))) {
+ if (limitedLen > 0 &&
+ (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + limitedLen - 1]))) {
throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
}
- String str = new String(bytes, startPos, i);
+ final String str = new String(bytes, startPos, limitedLen);
return Float.parseFloat(str);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
index 26ee47b..a834f22 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
@@ -32,34 +32,23 @@ public class FloatValueParser extends FieldParser<FloatValue> {
@Override
public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, FloatValue reusable) {
-
- int i = startPos;
-
- final int delimLimit = limit - delimiter.length + 1;
-
- while (i < limit) {
- if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
- if (i == startPos) {
- setErrorState(ParseErrorState.EMPTY_COLUMN);
- return -1;
- }
- break;
- }
- i++;
+ final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
+ if (endPos < 0) {
+ return -1;
}
-
- if (i > startPos &&
- (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[i - 1]))) {
+
+ if (endPos > startPos &&
+ (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[endPos - 1]))) {
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
return -1;
}
- String str = new String(bytes, startPos, i - startPos);
+ String str = new String(bytes, startPos, endPos - startPos);
try {
float value = Float.parseFloat(str);
reusable.setValue(value);
this.result = reusable;
- return (i == limit) ? limit : i + delimiter.length;
+ return (endPos == limit) ? limit : endPos + delimiter.length;
}
catch (NumberFormatException e) {
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
new file mode 100644
index 0000000..859dcf8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlDateParser.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+import java.sql.Date;
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Parses a text field into a {@link java.sql.Date}.
+ */
+@PublicEvolving
+public class SqlDateParser extends FieldParser<Date> {
+
+ private static final Date DATE_INSTANCE = new Date(0L);
+
+ private Date result;
+
+ @Override
+ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Date reusable) {
+ final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
+ if (endPos < 0) {
+ return -1;
+ }
+
+ if (endPos > startPos &&
+ (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(endPos - 1)]))) {
+ setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+ return -1;
+ }
+
+ String str = new String(bytes, startPos, endPos - startPos);
+ try {
+ this.result = Date.valueOf(str);
+ return (endPos == limit) ? limit : endPos + delimiter.length;
+ } catch (IllegalArgumentException e) {
+ setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
+ return -1;
+ }
+ }
+
+ @Override
+ public Date createValue() {
+ return DATE_INSTANCE;
+ }
+
+ @Override
+ public Date getLastResult() {
+ return this.result;
+ }
+
+ /**
+ * Static utility to parse a field of type Date from a byte sequence that represents text
+ * characters
+ * (such as when read from a file stream).
+ *
+ * @param bytes The bytes containing the text data that should be parsed.
+ * @param startPos The offset to start the parsing.
+ * @param length The length of the byte sequence (counting from the offset).
+ * @return The parsed value.
+ * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
+ * represents not a correct number.
+ */
+ public static final Date parseField(byte[] bytes, int startPos, int length) {
+ return parseField(bytes, startPos, length, (char) 0xffff);
+ }
+
+ /**
+ * Static utility to parse a field of type Date from a byte sequence that represents text
+ * characters
+ * (such as when read from a file stream).
+ *
+ * @param bytes The bytes containing the text data that should be parsed.
+ * @param startPos The offset to start the parsing.
+ * @param length The length of the byte sequence (counting from the offset).
+ * @param delimiter The delimiter that terminates the field.
+ * @return The parsed value.
+ * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
+ * represents not a correct number.
+ */
+ public static final Date parseField(byte[] bytes, int startPos, int length, char delimiter) {
+ final int limitedLen = nextStringLength(bytes, startPos, length, delimiter);
+
+ if (limitedLen > 0 &&
+ (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + limitedLen - 1]))) {
+ throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
+ }
+
+ final String str = new String(bytes, startPos, limitedLen);
+ return Date.valueOf(str);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
new file mode 100644
index 0000000..fbddadc
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimeParser.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+import java.sql.Time;
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Parses a text field into a {@link Time}.
+ */
+@PublicEvolving
+public class SqlTimeParser extends FieldParser<Time> {
+
+ private static final Time TIME_INSTANCE = new Time(0L);
+
+ private Time result;
+
+ @Override
+ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Time reusable) {
+ final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
+ if (endPos < 0) {
+ return -1;
+ }
+
+ String str = new String(bytes, startPos, endPos - startPos);
+ try {
+ this.result = Time.valueOf(str);
+ return (endPos == limit) ? limit : endPos + delimiter.length;
+ } catch (IllegalArgumentException e) {
+ setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
+ return -1;
+ }
+ }
+
+ @Override
+ public Time createValue() {
+ return TIME_INSTANCE;
+ }
+
+ @Override
+ public Time getLastResult() {
+ return this.result;
+ }
+
+ /**
+ * Static utility to parse a field of type Time from a byte sequence that represents text
+ * characters
+ * (such as when read from a file stream).
+ *
+ * @param bytes The bytes containing the text data that should be parsed.
+ * @param startPos The offset to start the parsing.
+ * @param length The length of the byte sequence (counting from the offset).
+ * @return The parsed value.
+ * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
+ * represents not a correct number.
+ */
+ public static final Time parseField(byte[] bytes, int startPos, int length) {
+ return parseField(bytes, startPos, length, (char) 0xffff);
+ }
+
+ /**
+ * Static utility to parse a field of type Time from a byte sequence that represents text
+ * characters
+ * (such as when read from a file stream).
+ *
+ * @param bytes The bytes containing the text data that should be parsed.
+ * @param startPos The offset to start the parsing.
+ * @param length The length of the byte sequence (counting from the offset).
+ * @param delimiter The delimiter that terminates the field.
+ * @return The parsed value.
+ * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
+ * represents not a correct number.
+ */
+ public static final Time parseField(byte[] bytes, int startPos, int length, char delimiter) {
+ final int limitedLen = nextStringLength(bytes, startPos, length, delimiter);
+
+ if (limitedLen > 0 &&
+ (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + limitedLen - 1]))) {
+ throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
+ }
+
+ final String str = new String(bytes, startPos, limitedLen);
+ return Time.valueOf(str);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
new file mode 100644
index 0000000..0bcb602
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/SqlTimestampParser.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+import java.sql.Timestamp;
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Parses a text field into a {@link Timestamp}.
+ */
+@PublicEvolving
+public class SqlTimestampParser extends FieldParser<Timestamp> {
+
+ private static final Timestamp TIMESTAMP_INSTANCE = new Timestamp(0L);
+
+ private Timestamp result;
+
+ @Override
+ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Timestamp reusable) {
+ final int endPos = nextStringEndPos(bytes, startPos, limit, delimiter);
+ if (endPos < 0) {
+ return -1;
+ }
+
+ if (endPos > startPos &&
+ (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(endPos - 1)]))) {
+ setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+ return -1;
+ }
+
+ String str = new String(bytes, startPos, endPos - startPos);
+ try {
+ this.result = Timestamp.valueOf(str);
+ return (endPos == limit) ? limit : endPos + delimiter.length;
+ } catch (IllegalArgumentException e) {
+ setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
+ return -1;
+ }
+ }
+
+ @Override
+ public Timestamp createValue() {
+ return TIMESTAMP_INSTANCE;
+ }
+
+ @Override
+ public Timestamp getLastResult() {
+ return this.result;
+ }
+
+ /**
+ * Static utility to parse a field of type Timestamp from a byte sequence that represents text
+ * characters
+ * (such as when read from a file stream).
+ *
+ * @param bytes The bytes containing the text data that should be parsed.
+ * @param startPos The offset to start the parsing.
+ * @param length The length of the byte sequence (counting from the offset).
+ * @return The parsed value.
+ * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
+ * represents not a correct number.
+ */
+ public static final Timestamp parseField(byte[] bytes, int startPos, int length) {
+ return parseField(bytes, startPos, length, (char) 0xffff);
+ }
+
+ /**
+ * Static utility to parse a field of type Timestamp from a byte sequence that represents text
+ * characters
+ * (such as when read from a file stream).
+ *
+ * @param bytes The bytes containing the text data that should be parsed.
+ * @param startPos The offset to start the parsing.
+ * @param length The length of the byte sequence (counting from the offset).
+ * @param delimiter The delimiter that terminates the field.
+ * @return The parsed value.
+ * @throws IllegalArgumentException Thrown when the value cannot be parsed because the text
+ * represents not a correct number.
+ */
+ public static final Timestamp parseField(byte[] bytes, int startPos, int length, char delimiter) {
+ final int limitedLen = nextStringLength(bytes, startPos, length, delimiter);
+
+ if (limitedLen > 0 &&
+ (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + limitedLen - 1]))) {
+ throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
+ }
+
+ final String str = new String(bytes, startPos, limitedLen);
+ return Timestamp.valueOf(str);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java
index 2b5cfdf..8fb3319 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeComparatorTest.java
@@ -40,7 +40,7 @@ public class SqlTimeComparatorTest extends ComparatorTestBase<Time> {
protected Time[] getSortedTestData() {
return new Time[] {
Time.valueOf("00:00:00"),
- Time.valueOf("02:42:85"),
+ Time.valueOf("02:42:25"),
Time.valueOf("14:15:59"),
Time.valueOf("18:00:45")
};
http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java
index 4d16050..bfac789 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializerTest.java
@@ -47,7 +47,7 @@ public class SqlTimeSerializerTest extends SerializerTestBase<Time> {
return new Time[] {
new Time(0L),
Time.valueOf("00:00:00"),
- Time.valueOf("02:42:85"),
+ Time.valueOf("02:42:25"),
Time.valueOf("14:15:59"),
Time.valueOf("18:00:45")
};
http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java
index 0b8d294..e182d0a 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampComparatorTest.java
@@ -40,9 +40,9 @@ public class SqlTimestampComparatorTest extends ComparatorTestBase<Timestamp> {
protected Timestamp[] getSortedTestData() {
return new Timestamp[] {
Timestamp.valueOf("1970-01-01 00:00:00.000"),
- Timestamp.valueOf("1990-10-14 02:42:85.123"),
- Timestamp.valueOf("1990-10-14 02:42:85.123000001"),
- Timestamp.valueOf("1990-10-14 02:42:85.123000002"),
+ Timestamp.valueOf("1990-10-14 02:42:25.123"),
+ Timestamp.valueOf("1990-10-14 02:42:25.123000001"),
+ Timestamp.valueOf("1990-10-14 02:42:25.123000002"),
Timestamp.valueOf("2013-08-12 14:15:59.478"),
Timestamp.valueOf("2013-08-12 14:15:59.479"),
Timestamp.valueOf("2040-05-12 18:00:45.999")
http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java
index 70172d5..e825eaa 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializerTest.java
@@ -47,9 +47,9 @@ public class SqlTimestampSerializerTest extends SerializerTestBase<Timestamp> {
return new Timestamp[] {
new Timestamp(0L),
Timestamp.valueOf("1970-01-01 00:00:00.000"),
- Timestamp.valueOf("1990-10-14 02:42:85.123"),
- Timestamp.valueOf("1990-10-14 02:42:85.123000001"),
- Timestamp.valueOf("1990-10-14 02:42:85.123000002"),
+ Timestamp.valueOf("1990-10-14 02:42:25.123"),
+ Timestamp.valueOf("1990-10-14 02:42:25.123000001"),
+ Timestamp.valueOf("1990-10-14 02:42:25.123000002"),
Timestamp.valueOf("2013-08-12 14:15:59.478"),
Timestamp.valueOf("2013-08-12 14:15:59.479"),
Timestamp.valueOf("2040-05-12 18:00:45.999")
http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/types/parser/SqlDateParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/SqlDateParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/SqlDateParserTest.java
new file mode 100644
index 0000000..25015cd
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/SqlDateParserTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+
+import java.sql.Date;
+
+public class SqlDateParserTest extends ParserTestBase<Date> {
+
+ @Override
+ public String[] getValidTestValues() {
+ return new String[] {
+ "1970-01-01", "1990-10-14", "2013-08-12", "2040-05-12", "2040-5-12", "1970-1-1",
+ };
+ }
+
+ @Override
+ public Date[] getValidTestResults() {
+ return new Date[] {
+ Date.valueOf("1970-01-01"), Date.valueOf("1990-10-14"), Date.valueOf("2013-08-12"),
+ Date.valueOf("2040-05-12"), Date.valueOf("2040-05-12"), Date.valueOf("1970-01-01")
+ };
+ }
+
+ @Override
+ public String[] getInvalidTestValues() {
+ return new String[] {
+ " 2013-08-12", "2013-08-12 ", "2013-08--12", "13-08-12", "2013/08/12", " ", "\t",
+ "2013-XX-XX", "2000-02-35"
+ };
+ }
+
+ @Override
+ public boolean allowsEmptyField() {
+ return false;
+ }
+
+ @Override
+ public FieldParser<Date> getParser() {
+ return new SqlDateParser();
+ }
+
+ @Override
+ public Class<Date> getTypeClass() {
+ return Date.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimeParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimeParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimeParserTest.java
new file mode 100644
index 0000000..06ebd3d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimeParserTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+
+import java.sql.Time;
+
+public class SqlTimeParserTest extends ParserTestBase<Time> {
+
+ @Override
+ public String[] getValidTestValues() {
+ return new String[] {
+ "00:00:00", "02:42:25", "14:15:51", "18:00:45", "23:59:58", "0:0:0"
+ };
+ }
+
+ @Override
+ public Time[] getValidTestResults() {
+ return new Time[] {
+ Time.valueOf("00:00:00"), Time.valueOf("02:42:25"), Time.valueOf("14:15:51"),
+ Time.valueOf("18:00:45"), Time.valueOf("23:59:58"), Time.valueOf("0:0:0")
+ };
+ }
+
+ @Override
+ public String[] getInvalidTestValues() {
+ return new String[] {
+ " 00:00:00", "00:00:00 ", "00:00::00", "00x00:00", "2013/08/12", " ", "\t"
+ };
+ }
+
+ @Override
+ public boolean allowsEmptyField() {
+ return false;
+ }
+
+ @Override
+ public FieldParser<Time> getParser() {
+ return new SqlTimeParser();
+ }
+
+ @Override
+ public Class<Time> getTypeClass() {
+ return Time.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimestampParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimestampParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimestampParserTest.java
new file mode 100644
index 0000000..0527606
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/SqlTimestampParserTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+
+import java.sql.Timestamp;
+
+public class SqlTimestampParserTest extends ParserTestBase<Timestamp> {
+
+ @Override
+ public String[] getValidTestValues() {
+ return new String[] {
+ "1970-01-01 00:00:00.000", "1990-10-14 02:42:25", "1990-10-14 02:42:25.123", "1990-10-14 02:42:25.123000001",
+ "1990-10-14 02:42:25.123000002", "2013-08-12 14:15:59.478", "2013-08-12 14:15:59.47",
+ "0000-01-01 00:00:00.000",
+ };
+ }
+
+ @Override
+ public Timestamp[] getValidTestResults() {
+ return new Timestamp[] {
+ Timestamp.valueOf("1970-01-01 00:00:00.000"), Timestamp.valueOf("1990-10-14 02:42:25"), Timestamp.valueOf("1990-10-14 02:42:25.123"),
+ Timestamp.valueOf("1990-10-14 02:42:25.123000001"), Timestamp.valueOf("1990-10-14 02:42:25.123000002"),
+ Timestamp.valueOf("2013-08-12 14:15:59.478"), Timestamp.valueOf("2013-08-12 14:15:59.47"),
+ Timestamp.valueOf("0000-01-01 00:00:00.000")
+ };
+ }
+
+ @Override
+ public String[] getInvalidTestValues() {
+ return new String[] {
+ " 2013-08-12 14:15:59.479", "2013-08-12 14:15:59.479 ", "1970-01-01 00:00::00",
+ "00x00:00", "2013/08/12", "0000-01-01 00:00:00.f00", "2013-08-12 14:15:59.4788888888888888",
+ " ", "\t"
+ };
+ }
+
+ @Override
+ public boolean allowsEmptyField() {
+ return false;
+ }
+
+ @Override
+ public FieldParser<Timestamp> getParser() {
+ return new SqlTimestampParser();
+ }
+
+ @Override
+ public Class<Timestamp> getTypeClass() {
+ return Timestamp.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3507d59f/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
index db01b69..d176b79 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
@@ -20,9 +20,10 @@ package org.apache.flink.api.table.runtime.io
import java.io.{File, FileOutputStream, OutputStreamWriter}
import java.nio.charset.StandardCharsets
+import java.sql.{Date, Time, Timestamp}
import org.apache.flink.api.common.io.ParseException
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
import org.apache.flink.api.table.Row
import org.apache.flink.api.table.runtime.io.RowCsvInputFormatTest.{PATH, createTempFile, testRemovingTrailingCR}
import org.apache.flink.api.table.typeutils.RowTypeInfo
@@ -786,6 +787,45 @@ class RowCsvInputFormatTest {
assertEquals("\\\"Hello\\\" World", record.productElement(0))
assertEquals("We are\\\" young", record.productElement(1))
}
+
+ @Test
+ def testSqlTimeFields() {
+ val fileContent = "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5\n" +
+ "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5.3\n"
+
+ val split = createTempFile(fileContent)
+
+ val typeInfo = new RowTypeInfo(Seq(
+ SqlTimeTypeInfo.DATE,
+ SqlTimeTypeInfo.TIME,
+ SqlTimeTypeInfo.TIMESTAMP,
+ SqlTimeTypeInfo.TIMESTAMP))
+
+ val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo)
+ format.setFieldDelimiter("|")
+ format.configure(new Configuration)
+ format.open(split)
+
+ var result = new Row(4)
+
+ result = format.nextRecord(result)
+ assertNotNull(result)
+ assertEquals(Date.valueOf("1990-10-14"), result.productElement(0))
+ assertEquals(Time.valueOf("02:42:25"), result.productElement(1))
+ assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), result.productElement(2))
+ assertEquals(Timestamp.valueOf("1990-01-04 02:02:05"), result.productElement(3))
+
+ result = format.nextRecord(result)
+ assertNotNull(result)
+ assertEquals(Date.valueOf("1990-10-14"), result.productElement(0))
+ assertEquals(Time.valueOf("02:42:25"), result.productElement(1))
+ assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), result.productElement(2))
+ assertEquals(Timestamp.valueOf("1990-01-04 02:02:05.3"), result.productElement(3))
+
+ result = format.nextRecord(result)
+ assertNull(result)
+ assertTrue(format.reachedEnd)
+ }
}
object RowCsvInputFormatTest {