You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/08 12:45:23 UTC

[flink] branch release-1.9 updated (448c459 -> d61e606)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 448c459  [FLINK-13630][table-api-bridge] Use values retention configuration from TableConfig if not specified explicitly
     new 1f90520  [FLINK-13561][table-planner-blink] Remove some builtin datetime functions which can be covered by existing functions
     new 09d0f61  [FLINK-13561][table-planner-blink] Drop DATE_FORMAT(timestamp, from_format, to_format) function support
     new 18bd93b  [FLINK-13561][table-planner-blink] Drop CONVERT_TZ(timestamp, format, from_tz, to_tz) function support
     new 9ac2a9c  [FLINK-13561][table-planner-blink] Drop TO_TIMESTAMP(bigint) function support
     new a58fb83  [FLINK-13561][table-planner-blink] Drop TO_DATE(int) function support
     new 2403448  [FLINK-13561][table-planner-blink] Fix FROM_UNIXTIME(bigint [,format]) should work in session time zone
     new 45d0aef  [FLINK-13561][table-planner-blink] Fix UNIX_TIMESTAMP(string [,format]) should work in session time zone
     new 5ee3485  [FLINK-13561][table-planner-blink] Fix NOW() should return TIMESTAMP instead of BIGINT.
     new d61e606  [FLINK-13561][table-planner-blink] Fix wrong result of TIMESTAMPADD(HOUR, interval, DATE)

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../functions/sql/FlinkSqlOperatorTable.java       |  88 +-------
 .../flink/table/planner/codegen/CodeGenUtils.scala |   2 -
 .../planner/codegen/CodeGeneratorContext.scala     |  11 -
 .../planner/codegen/calls/BuiltInMethods.scala     |  57 +----
 .../planner/codegen/calls/FunctionGenerator.scala  |  44 ----
 .../planner/codegen/calls/ScalarOperatorGens.scala |  11 +-
 .../planner/codegen/calls/StringCallGen.scala      |  47 ----
 .../planner/expressions/ScalarFunctionsTest.scala  | 124 ++---------
 .../planner/expressions/TemporalTypesTest.scala    | 241 ++++++++++-----------
 .../planner/runtime/batch/sql/CalcITCase.scala     |  50 +----
 .../runtime/stream/sql/CorrelateITCase.scala       |   2 +-
 .../table/runtime/functions/SqlDateTimeUtils.java  |   4 +-
 12 files changed, 158 insertions(+), 523 deletions(-)


[flink] 02/09: [FLINK-13561][table-planner-blink] Drop DATE_FORMAT(timestamp, from_format, to_format) function support

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 09d0f61866d54269a95207426da9b105b38ea1f9
Author: Jark Wu <im...@gmail.com>
AuthorDate: Wed Aug 7 13:40:34 2019 +0800

    [FLINK-13561][table-planner-blink] Drop DATE_FORMAT(timestamp, from_format, to_format) function support
    
    This commit drops DATE_FORMAT(timestamp, from_format, to_format) function support in blink planner to align with other systems. We only support DATE_FORMAT(timestamp, to_format) and DATE_FORMAT(string, to_format) in this version.
---
 .../functions/sql/FlinkSqlOperatorTable.java       |  4 +---
 .../planner/codegen/calls/BuiltInMethods.scala     |  4 ----
 .../planner/codegen/calls/StringCallGen.scala      |  6 -----
 .../planner/expressions/TemporalTypesTest.scala    | 26 ++++++++++------------
 .../planner/runtime/batch/sql/CalcITCase.scala     |  4 +---
 5 files changed, 14 insertions(+), 30 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index d81b2f1..5219a11 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -573,9 +573,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 		ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.FORCE_NULLABLE),
 		InferTypes.RETURN_TYPE,
 		OperandTypes.or(
-			OperandTypes.sequence("'(TIMESTAMP, FORMAT)'",
-				OperandTypes.DATETIME, OperandTypes.STRING),
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING, SqlTypeFamily.STRING),
+			OperandTypes.family(SqlTypeFamily.TIMESTAMP, SqlTypeFamily.STRING),
 			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
 		SqlFunctionCategory.TIMEDATE);
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index dd91b52..ff4796b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -258,10 +258,6 @@ object BuiltInMethods {
   val DATE_FORMAT_LONG_STRING_TIME_ZONE = Types.lookupMethod(
     classOf[SqlDateTimeUtils], "dateFormat", classOf[Long], classOf[String], classOf[TimeZone])
 
-  val DATE_FORMAT_STRING_STRING_STRING = Types.lookupMethod(
-    classOf[SqlDateTimeUtils], "dateFormat", classOf[String],
-    classOf[String], classOf[String])
-
   val DATE_FORMAT_STIRNG_STRING = Types.lookupMethod(
     classOf[SqlDateTimeUtils], "dateFormat", classOf[String], classOf[String])
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
index 676eef6..aac9010 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
@@ -221,12 +221,6 @@ object StringCallGen {
           isCharacterString(operands(1).resultType) =>
         methodGen(BuiltInMethods.DATE_FORMAT_STIRNG_STRING)
 
-      case DATE_FORMAT if operands.size == 3 &&
-          isCharacterString(operands.head.resultType) &&
-          isCharacterString(operands(1).resultType) &&
-          isCharacterString(operands(2).resultType) =>
-        methodGen(BuiltInMethods.DATE_FORMAT_STRING_STRING_STRING)
-
       case CONVERT_TZ if operands.size == 3 &&
           isCharacterString(operands.head.resultType) &&
           isCharacterString(operands(1).resultType) &&
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
index 90bdc92..86175d1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
@@ -590,11 +590,6 @@ class TemporalTypesTest extends ExpressionTestBase {
     )
 
     testSqlApi(
-      "FROM_TIMESTAMP(f13)",
-      "null"
-    )
-
-    testSqlApi(
       "TO_TIMESTAMP(SUBSTR('', 2, -1))",
       "null"
     )
@@ -606,11 +601,14 @@ class TemporalTypesTest extends ExpressionTestBase {
   }
 
   @Test
-  def testdebug() = {
-    testSqlApi("DATE_FORMAT('2018-03-14 01:02:03', 'yyyy/MM/dd HH:mm:ss')",
+  def testDateFormat(): Unit = {
+    testSqlApi(
+      "DATE_FORMAT('2018-03-14 01:02:03', 'yyyy/MM/dd HH:mm:ss')",
       "2018/03/14 01:02:03")
-    testSqlApi("DATE_FORMAT('2018-03-14 01:02:03', 'yyyy-MM-dd HH:mm:ss', " +
-        "'yyyy/MM/dd HH:mm:ss')", "2018/03/14 01:02:03")
+
+    testSqlApi(
+      s"DATE_FORMAT(${timestampTz("2018-03-14 01:02:03")}, 'yyyy-MM-dd HH:mm:ss')",
+      "2018-03-14 01:02:03")
   }
 
   @Test
@@ -659,12 +657,12 @@ class TemporalTypesTest extends ExpressionTestBase {
     testSqlApi(timestampTz("2018-03-14 19:00:00.010"), "2018-03-14 19:00:00.010")
 
     // DATE_FORMAT
-    testSqlApi("DATE_FORMAT('2018-03-14 01:02:03', 'yyyy/MM/dd HH:mm:ss')",
+    testSqlApi(
+      "DATE_FORMAT('2018-03-14 01:02:03', 'yyyy/MM/dd HH:mm:ss')",
       "2018/03/14 01:02:03")
-    testSqlApi("DATE_FORMAT('2018-03-14 01:02:03', 'yyyy-MM-dd HH:mm:ss', " +
-      "'yyyy/MM/dd HH:mm:ss')", "2018/03/14 01:02:03")
-    testSqlApi(s"DATE_FORMAT(${timestampTz("2018-03-14 01:02:03")}," +
-        " 'yyyy-MM-dd HH:mm:ss')",
+
+    testSqlApi(
+      s"DATE_FORMAT(${timestampTz("2018-03-14 01:02:03")}, 'yyyy-MM-dd HH:mm:ss')",
       "2018-03-14 01:02:03")
 
     // EXTRACT
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index cf3a705..d6fa3a0 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -1079,13 +1079,11 @@ class CalcITCase extends BatchTestBase {
     //j 2015-05-20 10:00:00.887
     checkResult("SELECT j, " +
         " DATE_FORMAT(j, 'yyyy/MM/dd HH:mm:ss')," +
-        " DATE_FORMAT('2015-05-20 10:00:00.887', 'yyyy/MM/dd HH:mm:ss')," +
-        " DATE_FORMAT('2015-05-20 10:00:00.887', 'yyyy-MM-dd HH:mm:ss', 'yyyy/MM/dd HH:mm:ss')" +
+        " DATE_FORMAT('2015-05-20 10:00:00.887', 'yyyy/MM/dd HH:mm:ss')" +
         " FROM testTable WHERE a = TRUE",
       Seq(
         row(localDateTime("2015-05-20 10:00:00.887"),
           "2015/05/20 10:00:00",
-          "2015/05/20 10:00:00",
           "2015/05/20 10:00:00")
       ))
   }


[flink] 03/09: [FLINK-13561][table-planner-blink] Drop CONVERT_TZ(timestamp, format, from_tz, to_tz) function support

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 18bd93b1f924be538addadc4c7953c454d099104
Author: Jark Wu <im...@gmail.com>
AuthorDate: Wed Aug 7 13:40:57 2019 +0800

    [FLINK-13561][table-planner-blink] Drop CONVERT_TZ(timestamp, format, from_tz, to_tz) function support
    
    This commit drops CONVERT_TZ(timestamp, format, from_tz, to_tz) function support in blink planner to align with other systems. We only support CONVERT_TZ(timestamp, from_tz, to_tz) in this version.
---
 .../flink/table/planner/functions/sql/FlinkSqlOperatorTable.java   | 5 +----
 .../apache/flink/table/planner/codegen/calls/StringCallGen.scala   | 7 -------
 .../apache/flink/table/planner/expressions/TemporalTypesTest.scala | 6 +++---
 3 files changed, 4 insertions(+), 14 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 5219a11..e27e458 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -782,10 +782,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 		SqlKind.OTHER_FUNCTION,
 		ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.FORCE_NULLABLE),
 		null,
-		OperandTypes.or(
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING, SqlTypeFamily.STRING),
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING,
-				SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
+		OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING, SqlTypeFamily.STRING),
 		SqlFunctionCategory.TIMEDATE);
 
 	public static final SqlFunction LOCATE = new SqlFunction(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
index aac9010..7c4a129 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
@@ -227,13 +227,6 @@ object StringCallGen {
           isCharacterString(operands(2).resultType) =>
         methodGen(BuiltInMethods.CONVERT_TZ)
 
-      case CONVERT_TZ if operands.size == 4 &&
-          isCharacterString(operands.head.resultType) &&
-          isCharacterString(operands(1).resultType) &&
-          isCharacterString(operands(2).resultType) &&
-          isCharacterString(operands(3).resultType) =>
-        methodGen(BuiltInMethods.CONVERT_FORMAT_TZ)
-
       case _ => null
     }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
index 86175d1..e4b98c5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
@@ -854,7 +854,7 @@ class TemporalTypesTest extends ExpressionTestBase {
     testSqlApi(s"TO_TIMESTAMP('$invalidStr', 'yyyy-mm-dd')", nullable)
     testSqlApi(s"TO_DATE('$invalidStr')", nullable)
     testSqlApi(
-      s"CONVERT_TZ('$invalidStr', 'yyyy-MM-dd HH:mm:ss', 'UTC', 'Asia/Shanghai')",
+      s"CONVERT_TZ('$invalidStr', 'UTC', 'Asia/Shanghai')",
       nullable)
   }
 
@@ -865,7 +865,7 @@ class TemporalTypesTest extends ExpressionTestBase {
       s"DATE_FORMAT('$invalidStr', 'yyyy/MM/dd HH:mm:ss')",
       s"TO_TIMESTAMP('$invalidStr', 'yyyy-mm-dd')",
       s"TO_DATE('$invalidStr')",
-      s"CONVERT_TZ('$invalidStr', 'yyyy-MM-dd HH:mm:ss', 'UTC', 'Asia/Shanghai')")
+      s"CONVERT_TZ('$invalidStr', 'UTC', 'Asia/Shanghai')")
 
     cases.foreach {
       caseExpr =>
@@ -875,7 +875,7 @@ class TemporalTypesTest extends ExpressionTestBase {
 
   @Test
   def testConvertTZ(): Unit = {
-    testSqlApi("CONVERT_TZ('2018-03-14 11:00:00', 'yyyy-MM-dd HH:mm:ss', 'UTC', 'Asia/Shanghai')",
+    testSqlApi("CONVERT_TZ('2018-03-14 11:00:00', 'UTC', 'Asia/Shanghai')",
                "2018-03-14 19:00:00")
   }
 


[flink] 01/09: [FLINK-13561][table-planner-blink] Remove some builtin datetime functions which can be covered by existing functions

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1f905207f15bd9ceed8f9c3d2b3fcf0cc269e742
Author: Jark Wu <im...@gmail.com>
AuthorDate: Wed Aug 7 13:36:40 2019 +0800

    [FLINK-13561][table-planner-blink] Remove some builtin datetime functions which can be covered by existing functions
    
    Removes DATE_FORMAT_TZ, DATE_ADD,DATE_SUB, DATEDIFF, FROM_TIMESTAMP, TO_TIMESTAMP_TZ builtin functions which can be covered by existing functions.
---
 .../functions/sql/FlinkSqlOperatorTable.java       | 60 ----------------------
 .../planner/codegen/calls/BuiltInMethods.scala     | 16 ------
 .../planner/codegen/calls/FunctionGenerator.scala  | 23 ---------
 .../planner/codegen/calls/StringCallGen.scala      | 34 ------------
 .../planner/expressions/TemporalTypesTest.scala    | 37 ++-----------
 .../planner/runtime/batch/sql/CalcITCase.scala     | 29 -----------
 .../runtime/stream/sql/CorrelateITCase.scala       |  2 +-
 7 files changed, 6 insertions(+), 195 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index e201400..d81b2f1 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -659,38 +659,6 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 			OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.STRING)),
 		SqlFunctionCategory.TIMEDATE);
 
-	public static final SqlFunction DATEDIFF = new SqlFunction(
-		"DATEDIFF",
-		SqlKind.OTHER_FUNCTION,
-		ReturnTypes.INTEGER_NULLABLE,
-		null,
-		OperandTypes.or(
-			OperandTypes.family(SqlTypeFamily.TIMESTAMP, SqlTypeFamily.STRING),
-			OperandTypes.family(SqlTypeFamily.TIMESTAMP, SqlTypeFamily.TIMESTAMP),
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.TIMESTAMP),
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
-		SqlFunctionCategory.TIMEDATE);
-
-	public static final SqlFunction DATE_SUB = new SqlFunction(
-		"DATE_SUB",
-		SqlKind.OTHER_FUNCTION,
-		VARCHAR_2000_NULLABLE,
-		null,
-		OperandTypes.or(
-			OperandTypes.family(SqlTypeFamily.TIMESTAMP, SqlTypeFamily.INTEGER),
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.INTEGER)),
-		SqlFunctionCategory.TIMEDATE);
-
-	public static final SqlFunction DATE_ADD = new SqlFunction(
-		"DATE_ADD",
-		SqlKind.OTHER_FUNCTION,
-		VARCHAR_2000_NULLABLE,
-		null,
-		OperandTypes.or(
-			OperandTypes.family(SqlTypeFamily.TIMESTAMP, SqlTypeFamily.INTEGER),
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.INTEGER)),
-		SqlFunctionCategory.TIMEDATE);
-
 	public static final SqlFunction IF = new SqlFunction(
 		"IF",
 		SqlKind.OTHER_FUNCTION,
@@ -800,14 +768,6 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
 		SqlFunctionCategory.TIMEDATE);
 
-	public static final SqlFunction FROM_TIMESTAMP = new SqlFunction(
-		"FROM_TIMESTAMP",
-		SqlKind.OTHER_FUNCTION,
-		ReturnTypes.cascade(ReturnTypes.BIGINT, SqlTypeTransforms.TO_NULLABLE),
-		null,
-		OperandTypes.family(SqlTypeFamily.TIMESTAMP),
-		SqlFunctionCategory.TIMEDATE);
-
 	public static final SqlFunction TO_DATE = new SqlFunction(
 		"TO_DATE",
 		SqlKind.OTHER_FUNCTION,
@@ -819,26 +779,6 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
 		SqlFunctionCategory.TIMEDATE);
 
-	public static final SqlFunction TO_TIMESTAMP_TZ = new SqlFunction(
-		"TO_TIMESTAMP_TZ",
-		SqlKind.OTHER_FUNCTION,
-		ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.TIMESTAMP), SqlTypeTransforms.FORCE_NULLABLE),
-		null,
-		OperandTypes.or(
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING),
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
-		SqlFunctionCategory.TIMEDATE);
-
-	public static final SqlFunction DATE_FORMAT_TZ = new SqlFunction(
-		"DATE_FORMAT_TZ",
-		SqlKind.OTHER_FUNCTION,
-		ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
-		InferTypes.RETURN_TYPE,
-		OperandTypes.or(
-			OperandTypes.family(SqlTypeFamily.TIMESTAMP, SqlTypeFamily.STRING, SqlTypeFamily.STRING),
-			OperandTypes.family(SqlTypeFamily.TIMESTAMP, SqlTypeFamily.STRING)),
-		SqlFunctionCategory.TIMEDATE);
-
 	public static final SqlFunction CONVERT_TZ = new SqlFunction(
 		"CONVERT_TZ",
 		SqlKind.OTHER_FUNCTION,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index 74b388a..dd91b52 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -460,22 +460,6 @@ object BuiltInMethods {
     "timestampCeil",
     classOf[TimeUnitRange], classOf[Long], classOf[TimeZone])
 
-  val STRING_TO_TIMESTAMP_TZ = Types.lookupMethod(
-    classOf[SqlDateTimeUtils],
-    "toTimestampTz",
-    classOf[String], classOf[String])
-
-  val STRING_TO_TIMESTAMP_FORMAT_TZ = Types.lookupMethod(
-    classOf[SqlDateTimeUtils],
-    "toTimestampTz",
-    classOf[String], classOf[String], classOf[String])
-
-  val DATE_FORMAT_LONG_ZONE = Types.lookupMethod(
-    classOf[SqlDateTimeUtils], "dateFormatTz", classOf[Long], classOf[String])
-
-  val DATE_FORMAT_LONG_STRING_ZONE = Types.lookupMethod(
-    classOf[SqlDateTimeUtils], "dateFormatTz", classOf[Long], classOf[String], classOf[String])
-
   val CONVERT_TZ = Types.lookupMethod(
     classOf[SqlDateTimeUtils],
     "convertTz",
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
index 29c2643..348907a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
@@ -648,12 +648,6 @@ object FunctionGenerator {
     Seq(TIMESTAMP_WITH_LOCAL_TIME_ZONE),
     BuiltInMethods.UNIX_TIMESTAMP_TS)
 
-  addSqlFunctionMethod(
-    DATEDIFF,
-    Seq(TIMESTAMP_WITHOUT_TIME_ZONE,
-      TIMESTAMP_WITHOUT_TIME_ZONE),
-    BuiltInMethods.DATEDIFF_T_T)
-
   // This sequence must be in sync with [[NumericOrDefaultReturnTypeInference]]
   val numericTypes = Seq(
     INTEGER,
@@ -817,11 +811,6 @@ object FunctionGenerator {
     Seq(DECIMAL),
     BuiltInMethods.DECIMAL_TO_TIMESTAMP)
 
-  addSqlFunctionMethod(
-    FROM_TIMESTAMP,
-    Seq(TIMESTAMP_WITHOUT_TIME_ZONE),
-    BuiltInMethods.TIMESTAMP_TO_BIGINT)
-
   INTEGRAL_TYPES foreach (
     dt => addSqlFunctionMethod(
       FROM_UNIXTIME,
@@ -842,18 +831,6 @@ object FunctionGenerator {
   addSqlFunctionMethod(FROM_UNIXTIME, Seq(BIGINT, VARCHAR), BuiltInMethods.FROM_UNIXTIME_FORMAT)
   addSqlFunctionMethod(FROM_UNIXTIME, Seq(BIGINT, CHAR), BuiltInMethods.FROM_UNIXTIME_FORMAT)
 
-  addSqlFunctionMethod(DATE_SUB, Seq(VARCHAR, INTEGER), BuiltInMethods.DATE_SUB_S)
-  addSqlFunctionMethod(DATE_SUB, Seq(CHAR, INTEGER), BuiltInMethods.DATE_SUB_S)
-
-  addSqlFunctionMethod(
-    DATE_SUB, Seq(TIMESTAMP_WITHOUT_TIME_ZONE, INTEGER), BuiltInMethods.DATE_SUB_T)
-
-  addSqlFunctionMethod(DATE_ADD, Seq(VARCHAR, INTEGER), BuiltInMethods.DATE_ADD_S)
-  addSqlFunctionMethod(DATE_ADD, Seq(CHAR, INTEGER), BuiltInMethods.DATE_ADD_S)
-
-  addSqlFunctionMethod(
-    DATE_ADD, Seq(TIMESTAMP_WITHOUT_TIME_ZONE, INTEGER), BuiltInMethods.DATE_ADD_T)
-
   // ----------------------------------------------------------------------------------------------
 
   /**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
index c542173..676eef6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
@@ -206,18 +206,6 @@ object StringCallGen {
           isCharacterString(operands(1).resultType) =>
         methodGen(BuiltInMethods.UNIX_TIMESTAMP_FORMAT)
 
-      case DATEDIFF if isTimestamp(operands.head.resultType) &&
-          isCharacterString(operands(1).resultType) =>
-        methodGen(BuiltInMethods.DATEDIFF_T_S)
-
-      case DATEDIFF if isCharacterString(operands.head.resultType) &&
-          isCharacterString(operands(1).resultType) =>
-        methodGen(BuiltInMethods.DATEDIFF_S_S)
-
-      case DATEDIFF if isCharacterString(operands.head.resultType) &&
-          isTimestamp(operands(1).resultType) =>
-        methodGen(BuiltInMethods.DATEDIFF_S_T)
-
       case DATE_FORMAT if operands.size == 2 &&
           isTimestamp(operands.head.resultType) &&
           isCharacterString(operands(1).resultType) =>
@@ -239,28 +227,6 @@ object StringCallGen {
           isCharacterString(operands(2).resultType) =>
         methodGen(BuiltInMethods.DATE_FORMAT_STRING_STRING_STRING)
 
-      case TO_TIMESTAMP_TZ if operands.size == 2 &&
-          isCharacterString(operands.head.resultType) &&
-          isCharacterString(operands(1).resultType) =>
-        methodGen(BuiltInMethods.STRING_TO_TIMESTAMP_TZ)
-
-      case TO_TIMESTAMP_TZ if operands.size == 3 &&
-          isCharacterString(operands.head.resultType) &&
-          isCharacterString(operands(1).resultType) &&
-          isCharacterString(operands(2).resultType) =>
-        methodGen(BuiltInMethods.STRING_TO_TIMESTAMP_FORMAT_TZ)
-
-      case DATE_FORMAT_TZ if operands.size == 2 &&
-          isTimestamp(operands.head.resultType) &&
-          isCharacterString(operands(1).resultType) =>
-        methodGen(BuiltInMethods.DATE_FORMAT_LONG_ZONE)
-
-      case DATE_FORMAT_TZ if operands.size == 3 &&
-          isTimestamp(operands.head.resultType) &&
-          isCharacterString(operands(1).resultType) &&
-          isCharacterString(operands(2).resultType) =>
-        methodGen(BuiltInMethods.DATE_FORMAT_LONG_STRING_ZONE)
-
       case CONVERT_TZ if operands.size == 3 &&
           isCharacterString(operands.head.resultType) &&
           isCharacterString(operands(1).resultType) &&
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
index b2d72b1..90bdc92 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
@@ -839,28 +839,14 @@ class TemporalTypesTest extends ExpressionTestBase {
 
   @Test
   def testNullableCases(): Unit = {
-    testSqlApi(
-      "DATE_FORMAT_TZ(TO_TIMESTAMP(cast(NUll as bigInt)), 'yyyy/MM/dd HH:mm:ss', 'Asia/Shanghai')",
-      nullable)
-
-    testSqlApi("CONVERT_TZ(cast(NUll as varchar), 'yyyy-MM-dd HH:mm:ss', 'UTC', 'Asia/Shanghai')",
+    testSqlApi("CONVERT_TZ(cast(NULL as varchar), 'UTC', 'Asia/Shanghai')",
       nullable)
 
-    testSqlApi("FROM_TIMESTAMP(f13)", nullable)
-
-    testSqlApi("DATE_FORMAT(cast(NUll as varchar), 'yyyy/MM/dd HH:mm:ss')", nullable)
-
-    testSqlApi("UNIX_TIMESTAMP(TO_TIMESTAMP(cast(NUll as bigInt)))", nullable)
-
-    testSqlApi("FROM_UNIXTIME(cast(NUll as bigInt))", nullable)
+    testSqlApi("DATE_FORMAT(cast(NULL as varchar), 'yyyy/MM/dd HH:mm:ss')", nullable)
 
-    testSqlApi("TO_DATE(cast(NUll as varchar))", nullable)
+    testSqlApi("FROM_UNIXTIME(cast(NULL as bigInt))", nullable)
 
-    testSqlApi("TO_TIMESTAMP_TZ(cast(NUll as varchar), 'Asia/Shanghai')", nullable)
-
-    testSqlApi(
-      "DATE_FORMAT_TZ(cast(NUll as timestamp), 'yyyy/MM/dd HH:mm:ss', 'Asia/Shanghai')",
-      nullable)
+    testSqlApi("TO_DATE(cast(NULL as varchar))", nullable)
   }
 
   @Test
@@ -869,7 +855,6 @@ class TemporalTypesTest extends ExpressionTestBase {
     testSqlApi(s"DATE_FORMAT('$invalidStr', 'yyyy/MM/dd HH:mm:ss')", nullable)
     testSqlApi(s"TO_TIMESTAMP('$invalidStr', 'yyyy-mm-dd')", nullable)
     testSqlApi(s"TO_DATE('$invalidStr')", nullable)
-    testSqlApi(s"TO_TIMESTAMP_TZ('$invalidStr', 'Asia/Shanghai')", nullable)
     testSqlApi(
       s"CONVERT_TZ('$invalidStr', 'yyyy-MM-dd HH:mm:ss', 'UTC', 'Asia/Shanghai')",
       nullable)
@@ -882,7 +867,6 @@ class TemporalTypesTest extends ExpressionTestBase {
       s"DATE_FORMAT('$invalidStr', 'yyyy/MM/dd HH:mm:ss')",
       s"TO_TIMESTAMP('$invalidStr', 'yyyy-mm-dd')",
       s"TO_DATE('$invalidStr')",
-      s"TO_TIMESTAMP_TZ('$invalidStr', 'Asia/Shanghai')",
       s"CONVERT_TZ('$invalidStr', 'yyyy-MM-dd HH:mm:ss', 'UTC', 'Asia/Shanghai')")
 
     cases.foreach {
@@ -892,20 +876,9 @@ class TemporalTypesTest extends ExpressionTestBase {
   }
 
   @Test
-  def testTimeZoneFunction(): Unit = {
-    testSqlApi("TO_TIMESTAMP_TZ('2018-03-14 11:00:00', 'Asia/Shanghai')", "2018-03-14 03:00:00.000")
-    testSqlApi("TO_TIMESTAMP_TZ('2018-03-14 11:00:00', 'yyyy-MM-dd HH:mm:ss', 'Asia/Shanghai')",
-               "2018-03-14 03:00:00.000")
-
+  def testConvertTZ(): Unit = {
     testSqlApi("CONVERT_TZ('2018-03-14 11:00:00', 'yyyy-MM-dd HH:mm:ss', 'UTC', 'Asia/Shanghai')",
                "2018-03-14 19:00:00")
-
-    testSqlApi("TO_TIMESTAMP_TZ(f14, 'UTC')", "null")
-
-    // Note that, if timezone is invalid, here we follow the default behavior of JDK's getTimeZone()
-    // It will use UTC timezone by default.
-    // TODO: it is would be better to report the error at compiling stage. timezone/format codegen
-    testSqlApi("TO_TIMESTAMP_TZ('2018-03-14 11:00:00', 'invalid_tz')", "2018-03-14 11:00:00.000")
   }
 
   // ----------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index ca98714..cf3a705 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -1168,35 +1168,6 @@ class CalcITCase extends BatchTestBase {
   }
 
   @Test
-  def testDateDiff(): Unit = {
-    checkResult("SELECT" +
-        " DATEDIFF('2017-12-14 01:00:34', '2016-12-14 12:00:00')," +
-        " DATEDIFF(TIMESTAMP '2017-12-14 01:00:23', '2016-08-14 12:00:00')," +
-        " DATEDIFF('2017-12-14 09:00:23', TIMESTAMP '2013-08-19 11:00:00')," +
-        " DATEDIFF(TIMESTAMP '2017-12-14 09:00:23', TIMESTAMP '2018-08-19 11:00:00')" +
-        " FROM testTable WHERE a = TRUE",
-      Seq(row(365, 487, 1578, -248)))
-  }
-
-  @Test
-  def testDateSub(): Unit = {
-    checkResult("SELECT" +
-        " DATE_SUB(TIMESTAMP '2017-12-14 09:00:23', 3)," +
-        " DATE_SUB('2017-12-14 09:00:23', -3)" +
-        " FROM testTable WHERE a = TRUE",
-      Seq(row("2017-12-11", "2017-12-17")))
-  }
-
-  @Test
-  def testDateAdd(): Unit = {
-    checkResult("SELECT" +
-        " DATE_ADD('2017-12-14', 4)," +
-        " DATE_ADD(TIMESTAMP '2017-12-14 09:10:20',-4)" +
-        " FROM testTable WHERE a = TRUE",
-      Seq(row("2017-12-18", "2017-12-10")))
-  }
-
-  @Test
   def testToDate(): Unit = {
     checkResult("SELECT" +
         " TO_DATE(CAST(null AS VARCHAR))," +
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
index 4268460..fa6be95 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
@@ -214,7 +214,7 @@ class CorrelateITCase extends StreamingTestBase {
     val sql =
       """
         |SELECT * FROM TMP1
-        |where DATE_ADD(v, 3) > DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd')
+        |where TIMESTAMP_ADD(day, 3, v) > DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd')
       """.stripMargin
 
     val sink = new TestingAppendSink


[flink] 08/09: [FLINK-13561][table-planner-blink] Fix NOW() should return TIMESTAMP instead of BIGINT.

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5ee3485e40248c74cca5325db0ab200be11bd7aa
Author: Jark Wu <im...@gmail.com>
AuthorDate: Wed Aug 7 11:59:57 2019 +0800

    [FLINK-13561][table-planner-blink] Fix NOW() should return TIMESTAMP instead of BIGINT.
    
    This aligns the behavior to other systems (MySQL, Spark). Because NOW() is Synonyms for CURRENT_TIMESTAMP.
---
 .../flink/table/planner/functions/sql/FlinkSqlOperatorTable.java    | 6 ++----
 .../apache/flink/table/planner/codegen/calls/BuiltInMethods.scala   | 3 ---
 .../flink/table/planner/codegen/calls/FunctionGenerator.scala       | 5 -----
 3 files changed, 2 insertions(+), 12 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index b619eb5..b3d91ae 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -606,11 +606,9 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 	public static final SqlFunction NOW = new SqlFunction(
 		"NOW",
 		SqlKind.OTHER_FUNCTION,
-		ReturnTypes.BIGINT,
+		ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 0),
 		null,
-		OperandTypes.or(
-			OperandTypes.NILADIC,
-			OperandTypes.family(SqlTypeFamily.INTEGER)),
+		OperandTypes.NILADIC,
 		SqlFunctionCategory.TIMEDATE) {
 
 		@Override
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index d40f9f0..11c6149 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -245,9 +245,6 @@ object BuiltInMethods {
   val NOW = Types.lookupMethod(
     classOf[SqlDateTimeUtils], "now")
 
-  val NOW_OFFSET = Types.lookupMethod(
-    classOf[SqlDateTimeUtils], "now", classOf[Long])
-
   val DATE_FORMAT_STRING_STRING_STRING_TIME_ZONE = Types.lookupMethod(
     classOf[SqlDateTimeUtils], "dateFormat", classOf[String],
     classOf[String], classOf[String], classOf[TimeZone])
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
index bf80027..5769b55 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
@@ -629,11 +629,6 @@ object FunctionGenerator {
     BuiltInMethods.NOW)
 
   addSqlFunctionMethod(
-    NOW,
-    Seq(INTEGER),
-    BuiltInMethods.NOW_OFFSET)
-
-  addSqlFunctionMethod(
     UNIX_TIMESTAMP,
     Seq(),
     BuiltInMethods.UNIX_TIMESTAMP)


[flink] 09/09: [FLINK-13561][table-planner-blink] Fix wrong result of TIMESTAMPADD(HOUR, interval, DATE)

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d61e6069985e20912443406c0492a169443b4c06
Author: Jark Wu <im...@gmail.com>
AuthorDate: Thu Aug 8 10:21:06 2019 +0800

    [FLINK-13561][table-planner-blink] Fix wrong result of TIMESTAMPADD(HOUR, interval, DATE)
    
    TIMESTAMPADD(HOUR, -1, DATE '2016-06-15') will get a wrong result of "2016-06-15", which should be "2016-06-14 23:00:00.000".
    
    This is a cherry-pick of FLINK-10009 to blink planner.
---
 .../planner/codegen/calls/ScalarOperatorGens.scala | 11 ++++--
 .../planner/expressions/ScalarFunctionsTest.scala  | 42 ++++++++++------------
 2 files changed, 28 insertions(+), 25 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index e992a1b..5cc5412 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -168,8 +168,15 @@ object ScalarOperatorGens {
         generateBinaryArithmeticOperator(ctx, op, left.resultType, left, right)
 
       case (DATE, INTERVAL_DAY_TIME) =>
-        generateOperatorIfNotNull(ctx, new DateType(), left, right) {
-          (l, r) => s"$l $op ((int) ($r / ${MILLIS_PER_DAY}L))"
+        resultType.getTypeRoot match {
+          case DATE =>
+            generateOperatorIfNotNull(ctx, new DateType(), left, right) {
+              (l, r) => s"$l $op ((int) ($r / ${MILLIS_PER_DAY}L))"
+            }
+          case TIMESTAMP_WITHOUT_TIME_ZONE =>
+            generateOperatorIfNotNull(ctx, new TimestampType(), left, right) {
+              (l, r) => s"($l * ${MILLIS_PER_DAY}L) $op $r"
+            }
         }
 
       case (DATE, INTERVAL_YEAR_MONTH) =>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index 93de62e..7c01910 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -24,9 +24,6 @@ import org.apache.flink.table.expressions.{Expression, ExpressionParser, TimeInt
 import org.apache.flink.table.planner.expressions.utils.ScalarTypesTestBase
 import org.junit.Test
 
-import java.sql.Timestamp
-import java.time.ZoneId
-
 class ScalarFunctionsTest extends ScalarTypesTestBase {
 
   // ----------------------------------------------------------------------------------------------
@@ -3474,26 +3471,25 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "timestampadd(DAY, 1, date '2016-06-15')",
       "2016-06-16")
 
-    // TODO support '2016-06-15'.toTimestamp
-//    testAllApis("2016-06-15".toTimestamp - 1.hour,
-//      "'2016-06-15'.toTimestamp - 1.hour",
-//      "timestampadd(HOUR, -1, date '2016-06-15')",
-//      "2016-06-14 23:00:00.0")
-
-//    testAllApis("2016-06-15".toTimestamp + 1.minute,
-//      "'2016-06-15'.toTimestamp + 1.minute",
-//      "timestampadd(MINUTE, 1, date '2016-06-15')",
-//      "2016-06-15 00:01:00.0")
-
-//    testAllApis("2016-06-15".toTimestamp - 1.second,
-//      "'2016-06-15'.toTimestamp - 1.second",
-//      "timestampadd(SQL_TSI_SECOND, -1, date '2016-06-15')",
-//      "2016-06-14 23:59:59.0")
-
-//    testAllApis("2016-06-15".toTimestamp + 1.second,
-//      "'2016-06-15'.toTimestamp + 1.second",
-//      "timestampadd(SECOND, 1, date '2016-06-15')",
-//      "2016-06-15 00:00:01.0")
+    testAllApis("2016-06-15".toTimestamp - 1.hour,
+      "'2016-06-15'.toTimestamp - 1.hour",
+      "timestampadd(HOUR, -1, date '2016-06-15')",
+      "2016-06-14 23:00:00.000")
+
+    testAllApis("2016-06-15".toTimestamp + 1.minute,
+      "'2016-06-15'.toTimestamp + 1.minute",
+      "timestampadd(MINUTE, 1, date '2016-06-15')",
+      "2016-06-15 00:01:00.000")
+
+    testAllApis("2016-06-15".toTimestamp - 1.second,
+      "'2016-06-15'.toTimestamp - 1.second",
+      "timestampadd(SQL_TSI_SECOND, -1, date '2016-06-15')",
+      "2016-06-14 23:59:59.000")
+
+    testAllApis("2016-06-15".toTimestamp + 1.second,
+      "'2016-06-15'.toTimestamp + 1.second",
+      "timestampadd(SECOND, 1, date '2016-06-15')",
+      "2016-06-15 00:00:01.000")
 
     testAllApis(nullOf(Types.SQL_TIMESTAMP) + 1.second,
       "nullOf(SQL_TIMESTAMP) + 1.second",


[flink] 04/09: [FLINK-13561][table-planner-blink] Drop TO_TIMESTAMP(bigint) function support

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9ac2a9c1cfed9da381c6b850efbca8d1ccd2ea5a
Author: Jark Wu <im...@gmail.com>
AuthorDate: Wed Aug 7 13:41:31 2019 +0800

    [FLINK-13561][table-planner-blink] Drop TO_TIMESTAMP(bigint) function support
    
    This commit drops TO_TIMESTAMP(bigint) function support in blink planner to align with other systems. We only support TO_TIMESTAMP(string [,format]) in this version.
---
 .../functions/sql/FlinkSqlOperatorTable.java       |  5 ++--
 .../planner/expressions/ScalarFunctionsTest.scala  | 34 ++--------------------
 .../table/runtime/functions/SqlDateTimeUtils.java  |  4 ++-
 3 files changed, 8 insertions(+), 35 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index e27e458..4c702a4 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -761,9 +761,8 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 		ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.TIMESTAMP), SqlTypeTransforms.FORCE_NULLABLE),
 		null,
 		OperandTypes.or(
-			OperandTypes.family(SqlTypeFamily.NUMERIC),
-			OperandTypes.family(SqlTypeFamily.STRING),
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
+			OperandTypes.family(SqlTypeFamily.CHARACTER),
+			OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)),
 		SqlFunctionCategory.TIMEDATE);
 
 	public static final SqlFunction TO_DATE = new SqlFunction(
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index 244812a..0e5f120 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -3531,6 +3531,9 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
     testSqlApi("to_timestamp(1513135677000)", "2017-12-13 03:27:57.000")
     testSqlApi("to_timestamp('2017-09-15 00:00:00')", "2017-09-15 00:00:00.000")
     testSqlApi("to_timestamp('20170915000000', 'yyyyMMddHHmmss')", "2017-09-15 00:00:00.000")
+    testSqlApi("to_timestamp('2017-09-15', 'yyyy-MM-dd')", "2017-09-15 00:00:00.000")
+    // test with null input
+    testSqlApi("to_timestamp(cast(null as varchar))", "null")
   }
 
   @Test
@@ -3989,37 +3992,6 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   }
 
   @Test
-  def testToTimestampWithNumeric(): Unit = {
-    // Test integral and fractional numeric to timestamp.
-    testSqlApi(
-      "to_timestamp(f2)",
-      "1970-01-01 00:00:00.042")
-    testSqlApi(
-      "to_timestamp(f3)",
-      "1970-01-01 00:00:00.043")
-    testSqlApi(
-      "to_timestamp(f4)",
-      "1970-01-01 00:00:00.044")
-    testSqlApi(
-      "to_timestamp(f5)",
-      "1970-01-01 00:00:00.004")
-    testSqlApi(
-      "to_timestamp(f6)",
-      "1970-01-01 00:00:00.004")
-    testSqlApi(
-      "to_timestamp(f7)",
-      "1970-01-01 00:00:00.003")
-    // Test decimal to timestamp.
-    testSqlApi(
-      "to_timestamp(f15)",
-      "1969-12-31 23:59:58.769")
-    // test with null input
-    testSqlApi(
-      "to_timestamp(cast(null as varchar))",
-      "null")
-  }
-
-  @Test
   def testFromUnixTimeWithNumeric(): Unit = {
     // Test integral and fractional numeric from_unixtime.
     testSqlApi(
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java
index 3cd1246..a6784bb 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java
@@ -220,7 +220,9 @@ public class SqlDateTimeUtils {
 	public static Long toTimestamp(String dateStr, TimeZone tz) {
 		int length = dateStr.length();
 		String format;
-		if (length == 21) {
+		if (length == 10) {
+			format = DATE_FORMAT_STRING;
+		} else if (length == 21) {
 			format = DEFAULT_DATETIME_FORMATS[1];
 		} else if (length == 22) {
 			format = DEFAULT_DATETIME_FORMATS[2];


[flink] 06/09: [FLINK-13561][table-planner-blink] Fix FROM_UNIXTIME(bigint [, format]) should work in session time zone

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2403448e58cde1d4c017d719d55d4a23edbdf0d9
Author: Jark Wu <im...@gmail.com>
AuthorDate: Tue Aug 6 21:53:56 2019 +0800

    [FLINK-13561][table-planner-blink] Fix FROM_UNIXTIME(bigint [,format]) should work in session time zone
    
    This aligns the behavior to other systems (MySQL, Spark).
---
 .../functions/sql/FlinkSqlOperatorTable.java       |   4 +-
 .../flink/table/planner/codegen/CodeGenUtils.scala |   2 -
 .../planner/codegen/CodeGeneratorContext.scala     |  11 --
 .../planner/codegen/calls/BuiltInMethods.scala     |  25 +---
 .../planner/codegen/calls/FunctionGenerator.scala  |  16 ---
 .../planner/expressions/ScalarFunctionsTest.scala  |  62 +--------
 .../planner/expressions/TemporalTypesTest.scala    | 143 +++++++++------------
 .../planner/runtime/batch/sql/CalcITCase.scala     |   8 --
 .../runtime/stream/sql/CorrelateITCase.scala       |   2 +-
 9 files changed, 69 insertions(+), 204 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 91fd8c3..f78bd1c 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -653,8 +653,8 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 		VARCHAR_2000_NULLABLE,
 		null,
 		OperandTypes.or(
-			OperandTypes.family(SqlTypeFamily.NUMERIC),
-			OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.STRING)),
+			OperandTypes.family(SqlTypeFamily.INTEGER),
+			OperandTypes.family(SqlTypeFamily.INTEGER, SqlTypeFamily.STRING)),
 		SqlFunctionCategory.TIMEDATE);
 
 	public static final SqlFunction IF = new SqlFunction(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
index 6f097f6..da859ef 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
@@ -47,8 +47,6 @@ object CodeGenUtils {
 
   val DEFAULT_TIMEZONE_TERM = "timeZone"
 
-  val DEFAULT_TIMEZONE_ID_TERM = "zoneId"
-
   val DEFAULT_INPUT1_TERM = "in1"
 
   val DEFAULT_INPUT2_TERM = "in2"
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
index 7e67a12..4244fd0 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
@@ -519,17 +519,6 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
   }
 
   /**
-    * Adds a reusable Time ZoneId to the member area of the generated class.
-    */
-  def addReusableTimeZoneID(): String = {
-    val zoneID = tableConfig.getLocalTimeZone.getId
-    val stmt =
-      s"""private static final java.time.ZoneId $DEFAULT_TIMEZONE_TERM =
-         |                 java.time.ZoneId.of("$zoneID");""".stripMargin
-    DEFAULT_TIMEZONE_ID_TERM
-  }
-
-  /**
     * Adds a reusable [[java.util.Random]] to the member area of the generated class.
     *
     * The seed parameter must be a literal/constant expression.
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index ff4796b..c46d432 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -290,29 +290,11 @@ object BuiltInMethods {
     classOf[SqlDateTimeUtils], "unixTimestamp", classOf[Long])
 
   val FROM_UNIXTIME_FORMAT = Types.lookupMethod(
-    classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[String])
-
-  val FROM_UNIXTIME = Types.lookupMethod(
-    classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long])
-
-  val FROM_UNIXTIME_AS_DOUBLE = Types.lookupMethod(
-    classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Double])
-
-  val FROM_UNIXTIME_AS_DECIMAL = Types.lookupMethod(
-    classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Decimal])
-
-  val FROM_UNIXTIME_FORMAT_TIME_ZONE = Types.lookupMethod(
     classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[String], classOf[TimeZone])
 
-  val FROM_UNIXTIME_TIME_ZONE = Types.lookupMethod(
+  val FROM_UNIXTIME = Types.lookupMethod(
     classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[TimeZone])
 
-  val FROM_UNIXTIME_AS_DOUBLE_TIME_ZONE = Types.lookupMethod(
-    classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Double], classOf[TimeZone])
-
-  val FROM_UNIXTIME_AS_DECIMAL_TIME_ZONE = Types.lookupMethod(
-    classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Decimal], classOf[TimeZone])
-
   val DATEDIFF_T_S_TIME_ZONE = Types.lookupMethod(
     classOf[SqlDateTimeUtils], "dateDiff", classOf[Long], classOf[String], classOf[TimeZone])
 
@@ -361,11 +343,6 @@ object BuiltInMethods {
   val DATE_ADD_T = Types.lookupMethod(
     classOf[SqlDateTimeUtils], "dateAdd", classOf[Long], classOf[Int])
 
-  val INT_TO_DATE = Types.lookupMethod(
-    classOf[SqlDateTimeUtils],
-    "toDate",
-    classOf[Int])
-
   val LONG_TO_TIMESTAMP = Types.lookupMethod(
     classOf[SqlDateTimeUtils],
     "toTimestamp",
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
index 348907a..bf80027 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
@@ -792,11 +792,6 @@ object FunctionGenerator {
     Seq(DECIMAL),
     new HashCodeCallGen())
 
-  addSqlFunctionMethod(
-    TO_DATE,
-    Seq(INTEGER),
-    BuiltInMethods.INT_TO_DATE)
-
   INTEGRAL_TYPES foreach (
     dt => addSqlFunctionMethod(TO_TIMESTAMP,
       Seq(dt),
@@ -817,17 +812,6 @@ object FunctionGenerator {
       Seq(dt),
       BuiltInMethods.FROM_UNIXTIME))
 
-  FRACTIONAL_TYPES foreach (
-    dt => addSqlFunctionMethod(
-      FROM_UNIXTIME,
-      Seq(dt),
-      BuiltInMethods.FROM_UNIXTIME_AS_DOUBLE))
-
-  addSqlFunctionMethod(
-    FROM_UNIXTIME,
-    Seq(DECIMAL),
-    BuiltInMethods.FROM_UNIXTIME_AS_DECIMAL)
-
   addSqlFunctionMethod(FROM_UNIXTIME, Seq(BIGINT, VARCHAR), BuiltInMethods.FROM_UNIXTIME_FORMAT)
   addSqlFunctionMethod(FROM_UNIXTIME, Seq(BIGINT, CHAR), BuiltInMethods.FROM_UNIXTIME_FORMAT)
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index 0e5f120..ad225f3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -22,9 +22,13 @@ import org.apache.flink.table.api.scala.{currentDate, currentTime, currentTimest
 import org.apache.flink.table.api.{DataTypes, Types}
 import org.apache.flink.table.expressions.{Expression, ExpressionParser, TimeIntervalUnit, TimePointUnit}
 import org.apache.flink.table.planner.expressions.utils.ScalarTypesTestBase
-
 import org.junit.Test
 
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.time.ZoneId
+import java.util.Locale
+
 class ScalarFunctionsTest extends ScalarTypesTestBase {
 
   // ----------------------------------------------------------------------------------------------
@@ -3528,7 +3532,6 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   @Test
   def testToTimestamp(): Unit = {
     testSqlApi("to_timestamp('abc')", "null")
-    testSqlApi("to_timestamp(1513135677000)", "2017-12-13 03:27:57.000")
     testSqlApi("to_timestamp('2017-09-15 00:00:00')", "2017-09-15 00:00:00.000")
     testSqlApi("to_timestamp('20170915000000', 'yyyyMMddHHmmss')", "2017-09-15 00:00:00.000")
     testSqlApi("to_timestamp('2017-09-15', 'yyyy-MM-dd')", "2017-09-15 00:00:00.000")
@@ -3541,30 +3544,6 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
     testSqlApi("to_date('2017-09-15 00:00:00')", "2017-09-15")
   }
 
-  @Test
-  def testDateSub(): Unit = {
-    testSqlApi("date_sub(f18, 10)", "1996-10-31")
-    testSqlApi("date_sub(f18, -10)", "1996-11-20")
-    testSqlApi("date_sub(TIMESTAMP '2017-10-15 23:00:00', 30)", "2017-09-15")
-    testSqlApi("date_sub(f40, 30)", "null")
-    testSqlApi("date_sub(CAST(NULL AS TIMESTAMP), 30)", "null")
-    testSqlApi("date_sub(CAST(NULL AS VARCHAR), 30)", "null")
-    testSqlApi("date_sub('2017-10--11', 30)", "null")
-    testSqlApi("date_sub('2017--10-11', 30)", "null")
-  }
-
-  @Test
-  def testDateAdd(): Unit = {
-    testSqlApi("date_add(f18, 10)", "1996-11-20")
-    testSqlApi("date_add(f18, -10)", "1996-10-31")
-    testSqlApi("date_add(TIMESTAMP '2017-10-15 23:00:00', 30)", "2017-11-14")
-    testSqlApi("date_add(f40, 30)", "null")
-    testSqlApi("date_add(CAST(NULL AS TIMESTAMP), 30)", "null")
-    testSqlApi("date_add(CAST(NULL AS VARCHAR), 30)", "null")
-    testSqlApi("date_add('2017-10--11', 30)", "null")
-    testSqlApi("date_add('2017--10-11', 30)", "null")
-  }
-
   // ----------------------------------------------------------------------------------------------
   // Hash functions
   // ----------------------------------------------------------------------------------------------
@@ -3992,37 +3971,6 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   }
 
   @Test
-  def testFromUnixTimeWithNumeric(): Unit = {
-    // Test integral and fractional numeric from_unixtime.
-    testSqlApi(
-      "from_unixtime(f2)",
-      "1970-01-01 00:00:42")
-    testSqlApi(
-      "from_unixtime(f3)",
-      "1970-01-01 00:00:43")
-    testSqlApi(
-      "from_unixtime(f4)",
-      "1970-01-01 00:00:44")
-    testSqlApi(
-      "from_unixtime(f5)",
-      "1970-01-01 00:00:04")
-    testSqlApi(
-      "from_unixtime(f6)",
-      "1970-01-01 00:00:04")
-    testSqlApi(
-      "from_unixtime(f7)",
-      "1970-01-01 00:00:03")
-    // Test decimal to from_unixtime.
-    testSqlApi(
-      "from_unixtime(f15)",
-      "1969-12-31 23:39:29")
-    // test with null input
-    testSqlApi(
-      "from_unixtime(cast(null as int))",
-      "null")
-  }
-
-  @Test
   def testIsDecimal(): Unit = {
     testSqlApi(
       "IS_DECIMAL('1')",
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
index e4b98c5..4dc61a5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
@@ -28,13 +28,12 @@ import org.apache.flink.table.planner.utils.DateTimeTestUtil
 import org.apache.flink.table.planner.utils.DateTimeTestUtil._
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 import org.apache.flink.types.Row
-
 import org.junit.Test
 
 import java.sql.Timestamp
 import java.text.SimpleDateFormat
 import java.time.{Instant, ZoneId}
-import java.util.TimeZone
+import java.util.{Locale, TimeZone}
 
 class TemporalTypesTest extends ExpressionTestBase {
 
@@ -742,85 +741,6 @@ class TemporalTypesTest extends ExpressionTestBase {
   }
 
   @Test
-  def testUTCTimeZone(): Unit = {
-    config.setLocalTimeZone(ZoneId.of("UTC"))
-
-    // Test Calcite's RexLiteral
-    // 1521025200000  =  UTC: 2018-03-14 11:00:00
-    testSqlApi("TIMESTAMP '2018-03-14 11:00:00'", "2018-03-14 11:00:00.000")
-
-    testSqlApi("DATE '2018-03-14'", "2018-03-14")
-    testSqlApi("TIME '19:20:21'", "19:20:21")
-
-    testSqlApi("TO_TIMESTAMP('2018-03-14 11:00:00', 'yyyy-MM-dd HH:mm:ss')",
-      "2018-03-14 11:00:00.000")
-    testSqlApi("TO_TIMESTAMP('2018-03-14 11:00:00')",
-      "2018-03-14 11:00:00.000")
-    testSqlApi("TO_TIMESTAMP(1521025200000)", "2018-03-14 11:00:00.000")
-
-    // 1521025200000  "2018-03-14T11:00:00+0000"
-    testSqlApi("FROM_UNIXTIME(1521025200)",
-      "2018-03-14 11:00:00")
-    testSqlApi("FROM_UNIXTIME(1521025200, 'yyyy-MM-dd HH:mm:ss')",
-      "2018-03-14 11:00:00")
-    testSqlApi("FROM_UNIXTIME(UNIX_TIMESTAMP(TO_TIMESTAMP('2018-03-14 11:00:00.0')))",
-      "2018-03-14 11:00:00")
-    testSqlApi("FROM_UNIXTIME(UNIX_TIMESTAMP('2018-03-14 11:00:00.0'))",
-      "2018-03-14 11:00:00")
-
-    // 1520960523000  "2018-03-13T17:02:03+0000"
-    testSqlApi("Date_ADD(TO_TIMESTAMP(1520960523000), 2)", "2018-03-15")
-    testSqlApi("Date_ADD(TO_TIMESTAMP('2018-03-13 17:02:03'), 2)", "2018-03-15")
-    testSqlApi("Date_ADD('2018-03-13 17:02:03', 2)", "2018-03-15")
-    testSqlApi("Date_SUB(TO_TIMESTAMP(1520960523000), 2)", "2018-03-11")
-    testSqlApi("Date_SUB(TO_TIMESTAMP('2018-03-13 17:02:03'), 2)", "2018-03-11")
-    testSqlApi("Date_SUB('2018-03-13 17:02:03', 2)", "2018-03-11")
-    testSqlApi("date_add('2017--10-11', 30)", "null")
-    testSqlApi("date_sub('2017--10-11', 30)", "null")
-
-    // DATE_DIFF
-    testSqlApi("DATEDIFF(TO_TIMESTAMP(1520960523000), '2018-03-13 17:02:03')", "0")
-    testSqlApi("DATEDIFF(TO_TIMESTAMP(1520827201000), TO_TIMESTAMP(1520740801000))", "1")
-
-    // DATE_FORMAT
-    // 1520960523000  "2018-03-13 17:02:03+0000"
-    testSqlApi("DATE_FORMAT('2018-03-13 17:02:03', 'yyyy-MM-dd HH:mm:ss', " +
-      "'yyyy/MM/dd HH:mm:ss')", "2018/03/13 17:02:03")
-    testSqlApi("DATE_FORMAT(TO_TIMESTAMP(1520960523000), 'yyyy-MM-dd HH:mm:ss')",
-      "2018-03-13 17:02:03")
-  }
-
-  @Test
-  def testDaylightSavingTimeZone(): Unit = {
-    config.setLocalTimeZone(ZoneId.of("America/New_York"))
-
-    // TODO: add more testcases & fully support DST
-    // Daylight Saving
-    // America/New_York:  -5:00,  -4:00(DST)
-    // 2018-03-11 02:00:00 -> 2018:-3-11 03:00:00
-    // 2018-11-04 02:00:00 -> 2018-11-04 01:00:00
-
-    // Test Calcite's RexLiteral
-    testSqlApi("TIMESTAMP '2018-03-14 07:00:00'", "2018-03-14 07:00:00.000")
-
-    testSqlApi("TO_TIMESTAMP('2018-03-14 07:00:00', 'yyyy-MM-dd HH:mm:ss')",
-      "2018-03-14 07:00:00.000")
-    testSqlApi("TO_TIMESTAMP('2018-03-14 07:00:00')",
-      "2018-03-14 07:00:00.000")
-    testSqlApi("f18", "2018-03-14 07:00:00.000")
-
-    testSqlApi("FROM_UNIXTIME(UNIX_TIMESTAMP(TO_TIMESTAMP('2018-03-14 07:00:00.0')))",
-      "2018-03-14 07:00:00")
-    testSqlApi("FROM_UNIXTIME(UNIX_TIMESTAMP('2018-03-14 07:00:00.0'))",
-      "2018-03-14 07:00:00")
-
-    // DATE_FORMAT
-    testSqlApi("DATE_FORMAT('2018-03-13 13:02:03', 'yyyy-MM-dd HH:mm:ss', " +
-      "'yyyy/MM/dd HH:mm:ss')", "2018/03/13 13:02:03")
-    testSqlApi("DATE_FORMAT(f19, 'yyyy-MM-dd HH:mm:ss')", "2018-03-13 13:02:03")
-  }
-
-  @Test
   def testHourUnitRangoonTimeZone(): Unit = {
     // Asia/Rangoon UTC Offset 6.5
     config.setLocalTimeZone(ZoneId.of("Asia/Rangoon"))
@@ -879,10 +799,63 @@ class TemporalTypesTest extends ExpressionTestBase {
                "2018-03-14 19:00:00")
   }
 
+  @Test
+  def testFromUnixTime(): Unit = {
+    val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
+    val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
+    val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
+    val fmt3 = "yy-MM-dd HH-mm-ss"
+    val sdf3 = new SimpleDateFormat(fmt3, Locale.US)
+
+    testSqlApi(
+      "from_unixtime(f21)",
+      sdf1.format(new Timestamp(44000)))
+    testSqlApi(
+      s"from_unixtime(f21, '$fmt2')",
+      sdf2.format(new Timestamp(44000)))
+    testSqlApi(
+      s"from_unixtime(f21, '$fmt3')",
+      sdf3.format(new Timestamp(44000)))
+
+    testSqlApi(
+      "from_unixtime(f22)",
+      sdf1.format(new Timestamp(3000)))
+    testSqlApi(
+      s"from_unixtime(f22, '$fmt2')",
+      sdf2.format(new Timestamp(3000)))
+    testSqlApi(
+      s"from_unixtime(f22, '$fmt3')",
+      sdf3.format(new Timestamp(3000)))
+
+    // test with null input
+    testSqlApi(
+      "from_unixtime(cast(null as int))",
+      "null")
+  }
+
+  @Test
+  def testFromUnixTimeInTokyo(): Unit = {
+    config.setLocalTimeZone(ZoneId.of("Asia/Tokyo"))
+    val fmt = "yy-MM-dd HH-mm-ss"
+    testSqlApi(
+      "from_unixtime(f21)",
+      "1970-01-01 09:00:44")
+    testSqlApi(
+      s"from_unixtime(f21, '$fmt')",
+      "70-01-01 09-00-44")
+
+    testSqlApi(
+      "from_unixtime(f22)",
+      "1970-01-01 09:00:03")
+    testSqlApi(
+      s"from_unixtime(f22, '$fmt')",
+      "70-01-01 09-00-03")
+  }
+
   // ----------------------------------------------------------------------------------------------
 
   override def testData: Row = {
-    val testData = new Row(21)
+    val testData = new Row(23)
     testData.setField(0, localDate("1990-10-14"))
     testData.setField(1, DateTimeTestUtil.localTime("10:20:45"))
     testData.setField(2, localDateTime("1990-10-14 10:20:45.123"))
@@ -908,6 +881,8 @@ class TemporalTypesTest extends ExpressionTestBase {
     testData.setField(18, Instant.ofEpochMilli(1521025200000L))
     testData.setField(19, Instant.ofEpochMilli(1520960523000L))
     testData.setField(20, Instant.ofEpochMilli(1520827201000L))
+    testData.setField(21, 44L)
+    testData.setField(22, 3)
     testData
   }
 
@@ -933,6 +908,8 @@ class TemporalTypesTest extends ExpressionTestBase {
       /* 17 */ Types.INSTANT,
       /* 18 */ Types.INSTANT,
       /* 19 */ Types.INSTANT,
-      /* 20 */ Types.INSTANT)
+      /* 20 */ Types.INSTANT,
+      /* 21 */ Types.LONG,
+      /* 22 */ Types.INT)
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index d6fa3a0..4210ce9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -1158,14 +1158,6 @@ class CalcITCase extends BatchTestBase {
   }
 
   @Test
-  def testFromUnixTime(): Unit = {
-    checkResult("SELECT" +
-        " FROM_UNIXTIME(1513193130), FROM_UNIXTIME(1513193130, 'MM/dd/yyyy HH:mm:ss')" +
-        " FROM testTable WHERE a = TRUE",
-      Seq(row("2017-12-13 19:25:30", "12/13/2017 19:25:30")))
-  }
-
-  @Test
   def testToDate(): Unit = {
     checkResult("SELECT" +
         " TO_DATE(CAST(null AS VARCHAR))," +
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
index fa6be95..91f7922 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
@@ -214,7 +214,7 @@ class CorrelateITCase extends StreamingTestBase {
     val sql =
       """
         |SELECT * FROM TMP1
-        |where TIMESTAMP_ADD(day, 3, v) > DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd')
+        |where TIMESTAMPADD(day, 3, cast(v as date)) > DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd')
       """.stripMargin
 
     val sink = new TestingAppendSink


[flink] 07/09: [FLINK-13561][table-planner-blink] Fix UNIX_TIMESTAMP(string [, format]) should work in session time zone

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 45d0aef20da019bd447ad7910629de7195ef9afa
Author: Jark Wu <im...@gmail.com>
AuthorDate: Wed Aug 7 13:41:21 2019 +0800

    [FLINK-13561][table-planner-blink] Fix UNIX_TIMESTAMP(string [,format]) should work in session time zone
    
    This aligns the behavior to other systems (MySQL, Spark). UNIX_TIMESTAMP(string [,format]) is an inverse of FROM_UNIXTIME(bigint [,format]). We also remove the support of UNIX_TIMESTAMP(timestamp) in this commit.
---
 .../functions/sql/FlinkSqlOperatorTable.java       |  3 +-
 .../planner/codegen/calls/BuiltInMethods.scala     |  9 -----
 .../planner/expressions/ScalarFunctionsTest.scala  |  2 --
 .../planner/expressions/TemporalTypesTest.scala    | 41 ++++++++++++++++++++++
 .../planner/runtime/batch/sql/CalcITCase.scala     |  9 -----
 5 files changed, 42 insertions(+), 22 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index f78bd1c..b619eb5 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -632,8 +632,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 		OperandTypes.or(
 			OperandTypes.NILADIC,
 			OperandTypes.family(SqlTypeFamily.STRING),
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING),
-			OperandTypes.family(SqlTypeFamily.TIMESTAMP)),
+			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
 		SqlFunctionCategory.TIMEDATE) {
 
 		@Override
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index c46d432..d40f9f0 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -268,19 +268,10 @@ object BuiltInMethods {
     classOf[SqlDateTimeUtils],
     "unixTimestamp",
     classOf[String],
-    classOf[String])
-
-  val UNIX_TIMESTAMP_FORMAT_TIME_ZONE = Types.lookupMethod(
-    classOf[SqlDateTimeUtils],
-    "unixTimestamp",
-    classOf[String],
     classOf[String],
     classOf[TimeZone])
 
   val UNIX_TIMESTAMP_STR = Types.lookupMethod(
-    classOf[SqlDateTimeUtils], "unixTimestamp", classOf[String])
-
-  val UNIX_TIMESTAMP_STR_TIME_ZONE = Types.lookupMethod(
     classOf[SqlDateTimeUtils], "unixTimestamp", classOf[String], classOf[TimeZone])
 
   val UNIX_TIMESTAMP = Types.lookupMethod(
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index ad225f3..93de62e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -25,9 +25,7 @@ import org.apache.flink.table.planner.expressions.utils.ScalarTypesTestBase
 import org.junit.Test
 
 import java.sql.Timestamp
-import java.text.SimpleDateFormat
 import java.time.ZoneId
-import java.util.Locale
 
 class ScalarFunctionsTest extends ScalarTypesTestBase {
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
index 4dc61a5..27965f9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
@@ -741,6 +741,20 @@ class TemporalTypesTest extends ExpressionTestBase {
   }
 
   @Test
+  def testDaylightSavingTimeZone(): Unit = {
+    // test from MySQL
+    // https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_unix-timestamp
+    // due to conventions for local time zone changes such as Daylight Saving Time (DST),
+    // it is possible for UNIX_TIMESTAMP() to map two values that are distinct in a non-UTC
+    // time zone to the same Unix timestamp value
+    config.setLocalTimeZone(ZoneId.of("MET")) // Europe/Amsterdam
+
+    testSqlApi("UNIX_TIMESTAMP('2005-03-27 03:00:00')", "1111885200")
+    testSqlApi("UNIX_TIMESTAMP('2005-03-27 02:00:00')", "1111885200")
+    testSqlApi("FROM_UNIXTIME(1111885200)", "2005-03-27 03:00:00")
+  }
+
+  @Test
   def testHourUnitRangoonTimeZone(): Unit = {
     // Asia/Rangoon UTC Offset 6.5
     config.setLocalTimeZone(ZoneId.of("Asia/Rangoon"))
@@ -852,6 +866,33 @@ class TemporalTypesTest extends ExpressionTestBase {
       "70-01-01 09-00-03")
   }
 
+  @Test
+  def testUnixTimestamp(): Unit = {
+    val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3")
+    val ts2 = Timestamp.valueOf("2015-07-25 02:02:02.2")
+    val s1 = "2015/07/24 10:00:00.5"
+    val s2 = "2015/07/25 02:02:02.6"
+    val ss1 = "2015-07-24 10:00:00"
+    val ss2 = "2015-07-25 02:02:02"
+    val fmt = "yyyy/MM/dd HH:mm:ss.S"
+
+    testSqlApi(s"UNIX_TIMESTAMP('$ss1')", (ts1.getTime / 1000L).toString)
+    testSqlApi(s"UNIX_TIMESTAMP('$ss2')", (ts2.getTime / 1000L).toString)
+    testSqlApi(s"UNIX_TIMESTAMP('$s1', '$fmt')", (ts1.getTime / 1000L).toString)
+    testSqlApi(s"UNIX_TIMESTAMP('$s2', '$fmt')", (ts2.getTime / 1000L).toString)
+  }
+
+  @Test
+  def testUnixTimestampInTokyo(): Unit = {
+    config.setLocalTimeZone(ZoneId.of("Asia/Tokyo"))
+    testSqlApi(
+      "UNIX_TIMESTAMP('2015-07-24 10:00:00')",
+      "1437699600")
+    testSqlApi(
+      "UNIX_TIMESTAMP('2015/07/24 10:00:00.5', 'yyyy/MM/dd HH:mm:ss.S')",
+      "1437699600")
+  }
+
   // ----------------------------------------------------------------------------------------------
 
   override def testData: Row = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index 4210ce9..1bb3711 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -1149,15 +1149,6 @@ class CalcITCase extends BatchTestBase {
   }
 
   @Test
-  def testUnixTimestamp(): Unit = {
-    checkResult("SELECT" +
-        " UNIX_TIMESTAMP('2017-12-13 19:25:30')," +
-        " UNIX_TIMESTAMP('2017-12-13 19:25:30', 'yyyy-MM-dd HH:mm:ss')" +
-        " FROM testTable WHERE a = TRUE",
-      Seq(row(1513193130, 1513193130)))
-  }
-
-  @Test
   def testToDate(): Unit = {
     checkResult("SELECT" +
         " TO_DATE(CAST(null AS VARCHAR))," +


[flink] 05/09: [FLINK-13561][table-planner-blink] Drop TO_DATE(int) function support

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a58fb83352cddd65ce8559f3296851f2ff2133ec
Author: Jark Wu <im...@gmail.com>
AuthorDate: Wed Aug 7 13:41:38 2019 +0800

    [FLINK-13561][table-planner-blink] Drop TO_DATE(int) function support
    
    This commit drops TO_DATE(int) function support in blink planner to align with other systems. We only support TO_DATE(string [,format]) in this version.
---
 .../apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java  | 1 -
 1 file changed, 1 deletion(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 4c702a4..91fd8c3 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -771,7 +771,6 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 		ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.DATE), SqlTypeTransforms.FORCE_NULLABLE),
 		null,
 		OperandTypes.or(
-			OperandTypes.family(SqlTypeFamily.NUMERIC),
 			OperandTypes.family(SqlTypeFamily.STRING),
 			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
 		SqlFunctionCategory.TIMEDATE);