You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/22 06:29:23 UTC

[flink] branch release-1.9 updated: [FLINK-13284][table-planner-blink] Correct some builtin functions' return type inference in Blink planner

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 694b208  [FLINK-13284][table-planner-blink] Correct some builtin functions' return type inference in Blink planner
694b208 is described below

commit 694b20874d54f76a86eb81eb7e1346833500f798
Author: lincoln-lil <li...@gmail.com>
AuthorDate: Wed Jul 17 15:22:43 2019 +0800

    [FLINK-13284][table-planner-blink] Correct some builtin functions' return type inference in Blink planner
    
    This closes #9146
---
 .../table/functions/sql/FlinkSqlOperatorTable.java | 10 +++---
 .../table/expressions/ScalarFunctionsTest.scala    |  1 +
 .../table/expressions/TemporalTypesTest.scala      | 41 ++++++++++++++++++++--
 .../table/runtime/functions/SqlDateTimeUtils.java  |  6 +++-
 4 files changed, 49 insertions(+), 9 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
index d3d2e5a..31bf0f4 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java
@@ -570,7 +570,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 	public static final SqlFunction DATE_FORMAT = new SqlFunction(
 		"DATE_FORMAT",
 		SqlKind.OTHER_FUNCTION,
-		ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
+		ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.FORCE_NULLABLE),
 		InferTypes.RETURN_TYPE,
 		OperandTypes.or(
 			OperandTypes.sequence("'(TIMESTAMP, FORMAT)'",
@@ -792,7 +792,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 	public static final SqlFunction TO_TIMESTAMP = new SqlFunction(
 		"TO_TIMESTAMP",
 		SqlKind.OTHER_FUNCTION,
-		ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.TIMESTAMP), SqlTypeTransforms.TO_NULLABLE),
+		ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.TIMESTAMP), SqlTypeTransforms.FORCE_NULLABLE),
 		null,
 		OperandTypes.or(
 			OperandTypes.family(SqlTypeFamily.NUMERIC),
@@ -811,7 +811,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 	public static final SqlFunction TO_DATE = new SqlFunction(
 		"TO_DATE",
 		SqlKind.OTHER_FUNCTION,
-		ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.DATE), SqlTypeTransforms.TO_NULLABLE),
+		ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.DATE), SqlTypeTransforms.FORCE_NULLABLE),
 		null,
 		OperandTypes.or(
 			OperandTypes.family(SqlTypeFamily.NUMERIC),
@@ -822,7 +822,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 	public static final SqlFunction TO_TIMESTAMP_TZ = new SqlFunction(
 		"TO_TIMESTAMP_TZ",
 		SqlKind.OTHER_FUNCTION,
-		ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.TIMESTAMP), SqlTypeTransforms.TO_NULLABLE),
+		ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.TIMESTAMP), SqlTypeTransforms.FORCE_NULLABLE),
 		null,
 		OperandTypes.or(
 			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING),
@@ -842,7 +842,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 	public static final SqlFunction CONVERT_TZ = new SqlFunction(
 		"CONVERT_TZ",
 		SqlKind.OTHER_FUNCTION,
-		ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
+		ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.FORCE_NULLABLE),
 		null,
 		OperandTypes.or(
 			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING, SqlTypeFamily.STRING),
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index f05314b..71c6c01 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -2556,6 +2556,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
 
   @Test
   def testToTimestamp(): Unit = {
+    testSqlApi("to_timestamp('abc')", "null")
     testSqlApi("to_timestamp(1513135677000)", "2017-12-13 03:27:57.000")
     testSqlApi("to_timestamp('2017-09-15 00:00:00')", "2017-09-15 00:00:00.000")
     testSqlApi("to_timestamp('20170915000000', 'yyyyMMddHHmmss')", "2017-09-15 00:00:00.000")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
index a43338d..4d0cf1d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
@@ -642,16 +642,52 @@ class TemporalTypesTest extends ExpressionTestBase {
 
     testSqlApi("FROM_UNIXTIME(cast(NUll as bigInt))", nullable)
 
+    testSqlApi("TO_DATE(cast(NUll as varchar))", nullable)
+
+    testSqlApi("TO_TIMESTAMP_TZ(cast(NUll as varchar), 'Asia/Shanghai')", nullable)
+
+    testSqlApi(
+      "DATE_FORMAT_TZ(cast(NUll as timestamp), 'yyyy/MM/dd HH:mm:ss', 'Asia/Shanghai')",
+      nullable)
+  }
+
+  @Test
+  def testInvalidInputCase(): Unit = {
+    val invalidStr = "invalid value"
+    testSqlApi(s"DATE_FORMAT('$invalidStr', 'yyyy/MM/dd HH:mm:ss')", nullable)
+    testSqlApi(s"TO_TIMESTAMP('$invalidStr', 'yyyy-mm-dd')", nullable)
+    testSqlApi(s"TO_DATE('$invalidStr')", nullable)
+    testSqlApi(s"TO_TIMESTAMP_TZ('$invalidStr', 'Asia/Shanghai')", nullable)
+    testSqlApi(
+      s"CONVERT_TZ('$invalidStr', 'yyyy-MM-dd HH:mm:ss', 'UTC', 'Asia/Shanghai')",
+      nullable)
+  }
+
+  @Test
+  def testTypeInferenceWithInvalidInput(): Unit = {
+    val invalidStr = "invalid value"
+    val cases = Seq(
+      s"DATE_FORMAT('$invalidStr', 'yyyy/MM/dd HH:mm:ss')",
+      s"TO_TIMESTAMP('$invalidStr', 'yyyy-mm-dd')",
+      s"TO_DATE('$invalidStr')",
+      s"TO_TIMESTAMP_TZ('$invalidStr', 'Asia/Shanghai')",
+      s"CONVERT_TZ('$invalidStr', 'yyyy-MM-dd HH:mm:ss', 'UTC', 'Asia/Shanghai')")
+
+    cases.foreach {
+      caseExpr =>
+        testSqlApi(
+          s"CASE WHEN ($caseExpr) is null THEN '$nullable' ELSE '$notNullable' END", nullable)
+    }
   }
 
   @Test
   def testTimeZoneFunction(): Unit = {
     testSqlApi("TO_TIMESTAMP_TZ('2018-03-14 11:00:00', 'Asia/Shanghai')", "2018-03-14 03:00:00.000")
     testSqlApi("TO_TIMESTAMP_TZ('2018-03-14 11:00:00', 'yyyy-MM-dd HH:mm:ss', 'Asia/Shanghai')",
-      "2018-03-14 03:00:00.000")
+               "2018-03-14 03:00:00.000")
 
     testSqlApi("CONVERT_TZ('2018-03-14 11:00:00', 'yyyy-MM-dd HH:mm:ss', 'UTC', 'Asia/Shanghai')",
-      "2018-03-14 19:00:00")
+               "2018-03-14 19:00:00")
 
     testSqlApi("TO_TIMESTAMP_TZ(f14, 'UTC')", "null")
 
@@ -661,7 +697,6 @@ class TemporalTypesTest extends ExpressionTestBase {
     testSqlApi("TO_TIMESTAMP_TZ('2018-03-14 11:00:00', 'invalid_tz')", "2018-03-14 11:00:00.000")
   }
 
-
   // ----------------------------------------------------------------------------------------------
 
   override def testData: Row = {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java
index a63efde..118dca7 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java
@@ -362,7 +362,11 @@ public class SqlDateTimeUtils {
 	 * @param tzTo the target time zone
 	 */
 	public static String convertTz(String dateStr, String format, String tzFrom, String tzTo) {
-		return dateFormatTz(toTimestampTz(dateStr, format, tzFrom), tzTo);
+		Long ts = toTimestampTz(dateStr, format, tzFrom);
+		if (null != ts) { // avoid NPE
+			return dateFormatTz(ts, tzTo);
+		}
+		return null;
 	}
 
 	public static String convertTz(String dateStr, String tzFrom, String tzTo) {