You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/08 12:38:28 UTC

[flink] 06/09: [FLINK-13561][table-planner-blink] Fix FROM_UNIXTIME(bigint [, format]) should work in session time zone

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8dbc64105c13dfae467f2cfb75ec67d0d2d56c84
Author: Jark Wu <im...@gmail.com>
AuthorDate: Tue Aug 6 21:53:56 2019 +0800

    [FLINK-13561][table-planner-blink] Fix FROM_UNIXTIME(bigint [,format]) should work in session time zone
    
    This aligns the behavior to other systems (MySQL, Spark).
---
 .../functions/sql/FlinkSqlOperatorTable.java       |   4 +-
 .../flink/table/planner/codegen/CodeGenUtils.scala |   2 -
 .../planner/codegen/CodeGeneratorContext.scala     |  11 --
 .../planner/codegen/calls/BuiltInMethods.scala     |  25 +---
 .../planner/codegen/calls/FunctionGenerator.scala  |  16 ---
 .../planner/expressions/ScalarFunctionsTest.scala  |  62 +--------
 .../planner/expressions/TemporalTypesTest.scala    | 143 +++++++++------------
 .../planner/runtime/batch/sql/CalcITCase.scala     |   8 --
 .../runtime/stream/sql/CorrelateITCase.scala       |   2 +-
 9 files changed, 69 insertions(+), 204 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 91fd8c3..f78bd1c 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -653,8 +653,8 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 		VARCHAR_2000_NULLABLE,
 		null,
 		OperandTypes.or(
-			OperandTypes.family(SqlTypeFamily.NUMERIC),
-			OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.STRING)),
+			OperandTypes.family(SqlTypeFamily.INTEGER),
+			OperandTypes.family(SqlTypeFamily.INTEGER, SqlTypeFamily.STRING)),
 		SqlFunctionCategory.TIMEDATE);
 
 	public static final SqlFunction IF = new SqlFunction(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
index 4ecfdbb..473b788 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
@@ -47,8 +47,6 @@ object CodeGenUtils {
 
   val DEFAULT_TIMEZONE_TERM = "timeZone"
 
-  val DEFAULT_TIMEZONE_ID_TERM = "zoneId"
-
   val DEFAULT_INPUT1_TERM = "in1"
 
   val DEFAULT_INPUT2_TERM = "in2"
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
index 7e67a12..4244fd0 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala
@@ -519,17 +519,6 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
   }
 
   /**
-    * Adds a reusable Time ZoneId to the member area of the generated class.
-    */
-  def addReusableTimeZoneID(): String = {
-    val zoneID = tableConfig.getLocalTimeZone.getId
-    val stmt =
-      s"""private static final java.time.ZoneId $DEFAULT_TIMEZONE_TERM =
-         |                 java.time.ZoneId.of("$zoneID");""".stripMargin
-    DEFAULT_TIMEZONE_ID_TERM
-  }
-
-  /**
     * Adds a reusable [[java.util.Random]] to the member area of the generated class.
     *
     * The seed parameter must be a literal/constant expression.
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index ff4796b..c46d432 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -290,29 +290,11 @@ object BuiltInMethods {
     classOf[SqlDateTimeUtils], "unixTimestamp", classOf[Long])
 
   val FROM_UNIXTIME_FORMAT = Types.lookupMethod(
-    classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[String])
-
-  val FROM_UNIXTIME = Types.lookupMethod(
-    classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long])
-
-  val FROM_UNIXTIME_AS_DOUBLE = Types.lookupMethod(
-    classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Double])
-
-  val FROM_UNIXTIME_AS_DECIMAL = Types.lookupMethod(
-    classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Decimal])
-
-  val FROM_UNIXTIME_FORMAT_TIME_ZONE = Types.lookupMethod(
     classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[String], classOf[TimeZone])
 
-  val FROM_UNIXTIME_TIME_ZONE = Types.lookupMethod(
+  val FROM_UNIXTIME = Types.lookupMethod(
     classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[TimeZone])
 
-  val FROM_UNIXTIME_AS_DOUBLE_TIME_ZONE = Types.lookupMethod(
-    classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Double], classOf[TimeZone])
-
-  val FROM_UNIXTIME_AS_DECIMAL_TIME_ZONE = Types.lookupMethod(
-    classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Decimal], classOf[TimeZone])
-
   val DATEDIFF_T_S_TIME_ZONE = Types.lookupMethod(
     classOf[SqlDateTimeUtils], "dateDiff", classOf[Long], classOf[String], classOf[TimeZone])
 
@@ -361,11 +343,6 @@ object BuiltInMethods {
   val DATE_ADD_T = Types.lookupMethod(
     classOf[SqlDateTimeUtils], "dateAdd", classOf[Long], classOf[Int])
 
-  val INT_TO_DATE = Types.lookupMethod(
-    classOf[SqlDateTimeUtils],
-    "toDate",
-    classOf[Int])
-
   val LONG_TO_TIMESTAMP = Types.lookupMethod(
     classOf[SqlDateTimeUtils],
     "toTimestamp",
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
index 348907a..bf80027 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
@@ -792,11 +792,6 @@ object FunctionGenerator {
     Seq(DECIMAL),
     new HashCodeCallGen())
 
-  addSqlFunctionMethod(
-    TO_DATE,
-    Seq(INTEGER),
-    BuiltInMethods.INT_TO_DATE)
-
   INTEGRAL_TYPES foreach (
     dt => addSqlFunctionMethod(TO_TIMESTAMP,
       Seq(dt),
@@ -817,17 +812,6 @@ object FunctionGenerator {
       Seq(dt),
       BuiltInMethods.FROM_UNIXTIME))
 
-  FRACTIONAL_TYPES foreach (
-    dt => addSqlFunctionMethod(
-      FROM_UNIXTIME,
-      Seq(dt),
-      BuiltInMethods.FROM_UNIXTIME_AS_DOUBLE))
-
-  addSqlFunctionMethod(
-    FROM_UNIXTIME,
-    Seq(DECIMAL),
-    BuiltInMethods.FROM_UNIXTIME_AS_DECIMAL)
-
   addSqlFunctionMethod(FROM_UNIXTIME, Seq(BIGINT, VARCHAR), BuiltInMethods.FROM_UNIXTIME_FORMAT)
   addSqlFunctionMethod(FROM_UNIXTIME, Seq(BIGINT, CHAR), BuiltInMethods.FROM_UNIXTIME_FORMAT)
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index 0e5f120..ad225f3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -22,9 +22,13 @@ import org.apache.flink.table.api.scala.{currentDate, currentTime, currentTimest
 import org.apache.flink.table.api.{DataTypes, Types}
 import org.apache.flink.table.expressions.{Expression, ExpressionParser, TimeIntervalUnit, TimePointUnit}
 import org.apache.flink.table.planner.expressions.utils.ScalarTypesTestBase
-
 import org.junit.Test
 
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.time.ZoneId
+import java.util.Locale
+
 class ScalarFunctionsTest extends ScalarTypesTestBase {
 
   // ----------------------------------------------------------------------------------------------
@@ -3528,7 +3532,6 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   @Test
   def testToTimestamp(): Unit = {
     testSqlApi("to_timestamp('abc')", "null")
-    testSqlApi("to_timestamp(1513135677000)", "2017-12-13 03:27:57.000")
     testSqlApi("to_timestamp('2017-09-15 00:00:00')", "2017-09-15 00:00:00.000")
     testSqlApi("to_timestamp('20170915000000', 'yyyyMMddHHmmss')", "2017-09-15 00:00:00.000")
     testSqlApi("to_timestamp('2017-09-15', 'yyyy-MM-dd')", "2017-09-15 00:00:00.000")
@@ -3541,30 +3544,6 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
     testSqlApi("to_date('2017-09-15 00:00:00')", "2017-09-15")
   }
 
-  @Test
-  def testDateSub(): Unit = {
-    testSqlApi("date_sub(f18, 10)", "1996-10-31")
-    testSqlApi("date_sub(f18, -10)", "1996-11-20")
-    testSqlApi("date_sub(TIMESTAMP '2017-10-15 23:00:00', 30)", "2017-09-15")
-    testSqlApi("date_sub(f40, 30)", "null")
-    testSqlApi("date_sub(CAST(NULL AS TIMESTAMP), 30)", "null")
-    testSqlApi("date_sub(CAST(NULL AS VARCHAR), 30)", "null")
-    testSqlApi("date_sub('2017-10--11', 30)", "null")
-    testSqlApi("date_sub('2017--10-11', 30)", "null")
-  }
-
-  @Test
-  def testDateAdd(): Unit = {
-    testSqlApi("date_add(f18, 10)", "1996-11-20")
-    testSqlApi("date_add(f18, -10)", "1996-10-31")
-    testSqlApi("date_add(TIMESTAMP '2017-10-15 23:00:00', 30)", "2017-11-14")
-    testSqlApi("date_add(f40, 30)", "null")
-    testSqlApi("date_add(CAST(NULL AS TIMESTAMP), 30)", "null")
-    testSqlApi("date_add(CAST(NULL AS VARCHAR), 30)", "null")
-    testSqlApi("date_add('2017-10--11', 30)", "null")
-    testSqlApi("date_add('2017--10-11', 30)", "null")
-  }
-
   // ----------------------------------------------------------------------------------------------
   // Hash functions
   // ----------------------------------------------------------------------------------------------
@@ -3992,37 +3971,6 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   }
 
   @Test
-  def testFromUnixTimeWithNumeric(): Unit = {
-    // Test integral and fractional numeric from_unixtime.
-    testSqlApi(
-      "from_unixtime(f2)",
-      "1970-01-01 00:00:42")
-    testSqlApi(
-      "from_unixtime(f3)",
-      "1970-01-01 00:00:43")
-    testSqlApi(
-      "from_unixtime(f4)",
-      "1970-01-01 00:00:44")
-    testSqlApi(
-      "from_unixtime(f5)",
-      "1970-01-01 00:00:04")
-    testSqlApi(
-      "from_unixtime(f6)",
-      "1970-01-01 00:00:04")
-    testSqlApi(
-      "from_unixtime(f7)",
-      "1970-01-01 00:00:03")
-    // Test decimal to from_unixtime.
-    testSqlApi(
-      "from_unixtime(f15)",
-      "1969-12-31 23:39:29")
-    // test with null input
-    testSqlApi(
-      "from_unixtime(cast(null as int))",
-      "null")
-  }
-
-  @Test
   def testIsDecimal(): Unit = {
     testSqlApi(
       "IS_DECIMAL('1')",
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
index e4b98c5..4dc61a5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
@@ -28,13 +28,12 @@ import org.apache.flink.table.planner.utils.DateTimeTestUtil
 import org.apache.flink.table.planner.utils.DateTimeTestUtil._
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 import org.apache.flink.types.Row
-
 import org.junit.Test
 
 import java.sql.Timestamp
 import java.text.SimpleDateFormat
 import java.time.{Instant, ZoneId}
-import java.util.TimeZone
+import java.util.{Locale, TimeZone}
 
 class TemporalTypesTest extends ExpressionTestBase {
 
@@ -742,85 +741,6 @@ class TemporalTypesTest extends ExpressionTestBase {
   }
 
   @Test
-  def testUTCTimeZone(): Unit = {
-    config.setLocalTimeZone(ZoneId.of("UTC"))
-
-    // Test Calcite's RexLiteral
-    // 1521025200000  =  UTC: 2018-03-14 11:00:00
-    testSqlApi("TIMESTAMP '2018-03-14 11:00:00'", "2018-03-14 11:00:00.000")
-
-    testSqlApi("DATE '2018-03-14'", "2018-03-14")
-    testSqlApi("TIME '19:20:21'", "19:20:21")
-
-    testSqlApi("TO_TIMESTAMP('2018-03-14 11:00:00', 'yyyy-MM-dd HH:mm:ss')",
-      "2018-03-14 11:00:00.000")
-    testSqlApi("TO_TIMESTAMP('2018-03-14 11:00:00')",
-      "2018-03-14 11:00:00.000")
-    testSqlApi("TO_TIMESTAMP(1521025200000)", "2018-03-14 11:00:00.000")
-
-    // 1521025200000  "2018-03-14T11:00:00+0000"
-    testSqlApi("FROM_UNIXTIME(1521025200)",
-      "2018-03-14 11:00:00")
-    testSqlApi("FROM_UNIXTIME(1521025200, 'yyyy-MM-dd HH:mm:ss')",
-      "2018-03-14 11:00:00")
-    testSqlApi("FROM_UNIXTIME(UNIX_TIMESTAMP(TO_TIMESTAMP('2018-03-14 11:00:00.0')))",
-      "2018-03-14 11:00:00")
-    testSqlApi("FROM_UNIXTIME(UNIX_TIMESTAMP('2018-03-14 11:00:00.0'))",
-      "2018-03-14 11:00:00")
-
-    // 1520960523000  "2018-03-13T17:02:03+0000"
-    testSqlApi("Date_ADD(TO_TIMESTAMP(1520960523000), 2)", "2018-03-15")
-    testSqlApi("Date_ADD(TO_TIMESTAMP('2018-03-13 17:02:03'), 2)", "2018-03-15")
-    testSqlApi("Date_ADD('2018-03-13 17:02:03', 2)", "2018-03-15")
-    testSqlApi("Date_SUB(TO_TIMESTAMP(1520960523000), 2)", "2018-03-11")
-    testSqlApi("Date_SUB(TO_TIMESTAMP('2018-03-13 17:02:03'), 2)", "2018-03-11")
-    testSqlApi("Date_SUB('2018-03-13 17:02:03', 2)", "2018-03-11")
-    testSqlApi("date_add('2017--10-11', 30)", "null")
-    testSqlApi("date_sub('2017--10-11', 30)", "null")
-
-    // DATE_DIFF
-    testSqlApi("DATEDIFF(TO_TIMESTAMP(1520960523000), '2018-03-13 17:02:03')", "0")
-    testSqlApi("DATEDIFF(TO_TIMESTAMP(1520827201000), TO_TIMESTAMP(1520740801000))", "1")
-
-    // DATE_FORMAT
-    // 1520960523000  "2018-03-13 17:02:03+0000"
-    testSqlApi("DATE_FORMAT('2018-03-13 17:02:03', 'yyyy-MM-dd HH:mm:ss', " +
-      "'yyyy/MM/dd HH:mm:ss')", "2018/03/13 17:02:03")
-    testSqlApi("DATE_FORMAT(TO_TIMESTAMP(1520960523000), 'yyyy-MM-dd HH:mm:ss')",
-      "2018-03-13 17:02:03")
-  }
-
-  @Test
-  def testDaylightSavingTimeZone(): Unit = {
-    config.setLocalTimeZone(ZoneId.of("America/New_York"))
-
-    // TODO: add more testcases & fully support DST
-    // Daylight Saving
-    // America/New_York:  -5:00,  -4:00(DST)
-    // 2018-03-11 02:00:00 -> 2018:-3-11 03:00:00
-    // 2018-11-04 02:00:00 -> 2018-11-04 01:00:00
-
-    // Test Calcite's RexLiteral
-    testSqlApi("TIMESTAMP '2018-03-14 07:00:00'", "2018-03-14 07:00:00.000")
-
-    testSqlApi("TO_TIMESTAMP('2018-03-14 07:00:00', 'yyyy-MM-dd HH:mm:ss')",
-      "2018-03-14 07:00:00.000")
-    testSqlApi("TO_TIMESTAMP('2018-03-14 07:00:00')",
-      "2018-03-14 07:00:00.000")
-    testSqlApi("f18", "2018-03-14 07:00:00.000")
-
-    testSqlApi("FROM_UNIXTIME(UNIX_TIMESTAMP(TO_TIMESTAMP('2018-03-14 07:00:00.0')))",
-      "2018-03-14 07:00:00")
-    testSqlApi("FROM_UNIXTIME(UNIX_TIMESTAMP('2018-03-14 07:00:00.0'))",
-      "2018-03-14 07:00:00")
-
-    // DATE_FORMAT
-    testSqlApi("DATE_FORMAT('2018-03-13 13:02:03', 'yyyy-MM-dd HH:mm:ss', " +
-      "'yyyy/MM/dd HH:mm:ss')", "2018/03/13 13:02:03")
-    testSqlApi("DATE_FORMAT(f19, 'yyyy-MM-dd HH:mm:ss')", "2018-03-13 13:02:03")
-  }
-
-  @Test
   def testHourUnitRangoonTimeZone(): Unit = {
     // Asia/Rangoon UTC Offset 6.5
     config.setLocalTimeZone(ZoneId.of("Asia/Rangoon"))
@@ -879,10 +799,63 @@ class TemporalTypesTest extends ExpressionTestBase {
                "2018-03-14 19:00:00")
   }
 
+  @Test
+  def testFromUnixTime(): Unit = {
+    val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
+    val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
+    val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
+    val fmt3 = "yy-MM-dd HH-mm-ss"
+    val sdf3 = new SimpleDateFormat(fmt3, Locale.US)
+
+    testSqlApi(
+      "from_unixtime(f21)",
+      sdf1.format(new Timestamp(44000)))
+    testSqlApi(
+      s"from_unixtime(f21, '$fmt2')",
+      sdf2.format(new Timestamp(44000)))
+    testSqlApi(
+      s"from_unixtime(f21, '$fmt3')",
+      sdf3.format(new Timestamp(44000)))
+
+    testSqlApi(
+      "from_unixtime(f22)",
+      sdf1.format(new Timestamp(3000)))
+    testSqlApi(
+      s"from_unixtime(f22, '$fmt2')",
+      sdf2.format(new Timestamp(3000)))
+    testSqlApi(
+      s"from_unixtime(f22, '$fmt3')",
+      sdf3.format(new Timestamp(3000)))
+
+    // test with null input
+    testSqlApi(
+      "from_unixtime(cast(null as int))",
+      "null")
+  }
+
+  @Test
+  def testFromUnixTimeInTokyo(): Unit = {
+    config.setLocalTimeZone(ZoneId.of("Asia/Tokyo"))
+    val fmt = "yy-MM-dd HH-mm-ss"
+    testSqlApi(
+      "from_unixtime(f21)",
+      "1970-01-01 09:00:44")
+    testSqlApi(
+      s"from_unixtime(f21, '$fmt')",
+      "70-01-01 09-00-44")
+
+    testSqlApi(
+      "from_unixtime(f22)",
+      "1970-01-01 09:00:03")
+    testSqlApi(
+      s"from_unixtime(f22, '$fmt')",
+      "70-01-01 09-00-03")
+  }
+
   // ----------------------------------------------------------------------------------------------
 
   override def testData: Row = {
-    val testData = new Row(21)
+    val testData = new Row(23)
     testData.setField(0, localDate("1990-10-14"))
     testData.setField(1, DateTimeTestUtil.localTime("10:20:45"))
     testData.setField(2, localDateTime("1990-10-14 10:20:45.123"))
@@ -908,6 +881,8 @@ class TemporalTypesTest extends ExpressionTestBase {
     testData.setField(18, Instant.ofEpochMilli(1521025200000L))
     testData.setField(19, Instant.ofEpochMilli(1520960523000L))
     testData.setField(20, Instant.ofEpochMilli(1520827201000L))
+    testData.setField(21, 44L)
+    testData.setField(22, 3)
     testData
   }
 
@@ -933,6 +908,8 @@ class TemporalTypesTest extends ExpressionTestBase {
       /* 17 */ Types.INSTANT,
       /* 18 */ Types.INSTANT,
       /* 19 */ Types.INSTANT,
-      /* 20 */ Types.INSTANT)
+      /* 20 */ Types.INSTANT,
+      /* 21 */ Types.LONG,
+      /* 22 */ Types.INT)
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index 50a33c3..17f1d6f 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -1158,14 +1158,6 @@ class CalcITCase extends BatchTestBase {
   }
 
   @Test
-  def testFromUnixTime(): Unit = {
-    checkResult("SELECT" +
-        " FROM_UNIXTIME(1513193130), FROM_UNIXTIME(1513193130, 'MM/dd/yyyy HH:mm:ss')" +
-        " FROM testTable WHERE a = TRUE",
-      Seq(row("2017-12-13 19:25:30", "12/13/2017 19:25:30")))
-  }
-
-  @Test
   def testToDate(): Unit = {
     checkResult("SELECT" +
         " TO_DATE(CAST(null AS VARCHAR))," +
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
index fa6be95..91f7922 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
@@ -214,7 +214,7 @@ class CorrelateITCase extends StreamingTestBase {
     val sql =
       """
         |SELECT * FROM TMP1
-        |where TIMESTAMP_ADD(day, 3, v) > DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd')
+        |where TIMESTAMPADD(day, 3, cast(v as date)) > DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd')
       """.stripMargin
 
     val sink = new TestingAppendSink