You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/22 06:29:23 UTC
[flink] branch release-1.9 updated:
[FLINK-13284][table-planner-blink] Correct some builtin functions' return
type inference in Blink planner
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 694b208 [FLINK-13284][table-planner-blink] Correct some builtin functions' return type inference in Blink planner
694b208 is described below
commit 694b20874d54f76a86eb81eb7e1346833500f798
Author: lincoln-lil <li...@gmail.com>
AuthorDate: Wed Jul 17 15:22:43 2019 +0800
[FLINK-13284][table-planner-blink] Correct some builtin functions' return type inference in Blink planner
This closes #9146
---
.../table/functions/sql/FlinkSqlOperatorTable.java | 10 +++---
.../table/expressions/ScalarFunctionsTest.scala | 1 +
.../table/expressions/TemporalTypesTest.scala | 41 ++++++++++++++++++++--
.../table/runtime/functions/SqlDateTimeUtils.java | 6 +++-
4 files changed, 49 insertions(+), 9 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
index d3d2e5a..31bf0f4 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
@@ -570,7 +570,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
public static final SqlFunction DATE_FORMAT = new SqlFunction(
"DATE_FORMAT",
SqlKind.OTHER_FUNCTION,
- ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
+ ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.FORCE_NULLABLE),
InferTypes.RETURN_TYPE,
OperandTypes.or(
OperandTypes.sequence("'(TIMESTAMP, FORMAT)'",
@@ -792,7 +792,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
public static final SqlFunction TO_TIMESTAMP = new SqlFunction(
"TO_TIMESTAMP",
SqlKind.OTHER_FUNCTION,
- ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.TIMESTAMP), SqlTypeTransforms.TO_NULLABLE),
+ ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.TIMESTAMP), SqlTypeTransforms.FORCE_NULLABLE),
null,
OperandTypes.or(
OperandTypes.family(SqlTypeFamily.NUMERIC),
@@ -811,7 +811,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
public static final SqlFunction TO_DATE = new SqlFunction(
"TO_DATE",
SqlKind.OTHER_FUNCTION,
- ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.DATE), SqlTypeTransforms.TO_NULLABLE),
+ ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.DATE), SqlTypeTransforms.FORCE_NULLABLE),
null,
OperandTypes.or(
OperandTypes.family(SqlTypeFamily.NUMERIC),
@@ -822,7 +822,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
public static final SqlFunction TO_TIMESTAMP_TZ = new SqlFunction(
"TO_TIMESTAMP_TZ",
SqlKind.OTHER_FUNCTION,
- ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.TIMESTAMP), SqlTypeTransforms.TO_NULLABLE),
+ ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.TIMESTAMP), SqlTypeTransforms.FORCE_NULLABLE),
null,
OperandTypes.or(
OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING),
@@ -842,7 +842,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
public static final SqlFunction CONVERT_TZ = new SqlFunction(
"CONVERT_TZ",
SqlKind.OTHER_FUNCTION,
- ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
+ ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.FORCE_NULLABLE),
null,
OperandTypes.or(
OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING, SqlTypeFamily.STRING),
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index f05314b..71c6c01 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -2556,6 +2556,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
@Test
def testToTimestamp(): Unit = {
+ testSqlApi("to_timestamp('abc')", "null")
testSqlApi("to_timestamp(1513135677000)", "2017-12-13 03:27:57.000")
testSqlApi("to_timestamp('2017-09-15 00:00:00')", "2017-09-15 00:00:00.000")
testSqlApi("to_timestamp('20170915000000', 'yyyyMMddHHmmss')", "2017-09-15 00:00:00.000")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
index a43338d..4d0cf1d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
@@ -642,16 +642,52 @@ class TemporalTypesTest extends ExpressionTestBase {
testSqlApi("FROM_UNIXTIME(cast(NUll as bigInt))", nullable)
+ testSqlApi("TO_DATE(cast(NUll as varchar))", nullable)
+
+ testSqlApi("TO_TIMESTAMP_TZ(cast(NUll as varchar), 'Asia/Shanghai')", nullable)
+
+ testSqlApi(
+ "DATE_FORMAT_TZ(cast(NUll as timestamp), 'yyyy/MM/dd HH:mm:ss', 'Asia/Shanghai')",
+ nullable)
+ }
+
+ @Test
+ def testInvalidInputCase(): Unit = {
+ val invalidStr = "invalid value"
+ testSqlApi(s"DATE_FORMAT('$invalidStr', 'yyyy/MM/dd HH:mm:ss')", nullable)
+ testSqlApi(s"TO_TIMESTAMP('$invalidStr', 'yyyy-mm-dd')", nullable)
+ testSqlApi(s"TO_DATE('$invalidStr')", nullable)
+ testSqlApi(s"TO_TIMESTAMP_TZ('$invalidStr', 'Asia/Shanghai')", nullable)
+ testSqlApi(
+ s"CONVERT_TZ('$invalidStr', 'yyyy-MM-dd HH:mm:ss', 'UTC', 'Asia/Shanghai')",
+ nullable)
+ }
+
+ @Test
+ def testTypeInferenceWithInvalidInput(): Unit = {
+ val invalidStr = "invalid value"
+ val cases = Seq(
+ s"DATE_FORMAT('$invalidStr', 'yyyy/MM/dd HH:mm:ss')",
+ s"TO_TIMESTAMP('$invalidStr', 'yyyy-mm-dd')",
+ s"TO_DATE('$invalidStr')",
+ s"TO_TIMESTAMP_TZ('$invalidStr', 'Asia/Shanghai')",
+ s"CONVERT_TZ('$invalidStr', 'yyyy-MM-dd HH:mm:ss', 'UTC', 'Asia/Shanghai')")
+
+ cases.foreach {
+ caseExpr =>
+ testSqlApi(
+ s"CASE WHEN ($caseExpr) is null THEN '$nullable' ELSE '$notNullable' END", nullable)
+ }
}
@Test
def testTimeZoneFunction(): Unit = {
testSqlApi("TO_TIMESTAMP_TZ('2018-03-14 11:00:00', 'Asia/Shanghai')", "2018-03-14 03:00:00.000")
testSqlApi("TO_TIMESTAMP_TZ('2018-03-14 11:00:00', 'yyyy-MM-dd HH:mm:ss', 'Asia/Shanghai')",
- "2018-03-14 03:00:00.000")
+ "2018-03-14 03:00:00.000")
testSqlApi("CONVERT_TZ('2018-03-14 11:00:00', 'yyyy-MM-dd HH:mm:ss', 'UTC', 'Asia/Shanghai')",
- "2018-03-14 19:00:00")
+ "2018-03-14 19:00:00")
testSqlApi("TO_TIMESTAMP_TZ(f14, 'UTC')", "null")
@@ -661,7 +697,6 @@ class TemporalTypesTest extends ExpressionTestBase {
testSqlApi("TO_TIMESTAMP_TZ('2018-03-14 11:00:00', 'invalid_tz')", "2018-03-14 11:00:00.000")
}
-
// ----------------------------------------------------------------------------------------------
override def testData: Row = {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java
index a63efde..118dca7 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java
@@ -362,7 +362,11 @@ public class SqlDateTimeUtils {
* @param tzTo the target time zone
*/
public static String convertTz(String dateStr, String format, String tzFrom, String tzTo) {
- return dateFormatTz(toTimestampTz(dateStr, format, tzFrom), tzTo);
+ Long ts = toTimestampTz(dateStr, format, tzFrom);
+ if (null != ts) { // avoid NPE
+ return dateFormatTz(ts, tzTo);
+ }
+ return null;
}
public static String convertTz(String dateStr, String tzFrom, String tzTo) {