You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/08 12:38:29 UTC

[flink] 07/09: [FLINK-13561][table-planner-blink] Fix UNIX_TIMESTAMP(string [, format]) should work in session time zone

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bde51f9e46600154bd60321d3d345bb539b30ed1
Author: Jark Wu <im...@gmail.com>
AuthorDate: Wed Aug 7 13:41:21 2019 +0800

    [FLINK-13561][table-planner-blink] Fix UNIX_TIMESTAMP(string [,format]) should work in session time zone
    
    This aligns the behavior to other systems (MySQL, Spark). UNIX_TIMESTAMP(string [,format]) is an inverse of FROM_UNIXTIME(bigint [,format]). We also remove the support of UNIX_TIMESTAMP(timestamp) in this commit.
---
 .../functions/sql/FlinkSqlOperatorTable.java       |  3 +-
 .../planner/codegen/calls/BuiltInMethods.scala     |  9 -----
 .../planner/expressions/ScalarFunctionsTest.scala  |  2 --
 .../planner/expressions/TemporalTypesTest.scala    | 41 ++++++++++++++++++++++
 .../planner/runtime/batch/sql/CalcITCase.scala     |  9 -----
 5 files changed, 42 insertions(+), 22 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index f78bd1c..b619eb5 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -632,8 +632,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable {
 		OperandTypes.or(
 			OperandTypes.NILADIC,
 			OperandTypes.family(SqlTypeFamily.STRING),
-			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING),
-			OperandTypes.family(SqlTypeFamily.TIMESTAMP)),
+			OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
 		SqlFunctionCategory.TIMEDATE) {
 
 		@Override
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index c46d432..d40f9f0 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -268,19 +268,10 @@ object BuiltInMethods {
     classOf[SqlDateTimeUtils],
     "unixTimestamp",
     classOf[String],
-    classOf[String])
-
-  val UNIX_TIMESTAMP_FORMAT_TIME_ZONE = Types.lookupMethod(
-    classOf[SqlDateTimeUtils],
-    "unixTimestamp",
-    classOf[String],
     classOf[String],
     classOf[TimeZone])
 
   val UNIX_TIMESTAMP_STR = Types.lookupMethod(
-    classOf[SqlDateTimeUtils], "unixTimestamp", classOf[String])
-
-  val UNIX_TIMESTAMP_STR_TIME_ZONE = Types.lookupMethod(
     classOf[SqlDateTimeUtils], "unixTimestamp", classOf[String], classOf[TimeZone])
 
   val UNIX_TIMESTAMP = Types.lookupMethod(
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index ad225f3..93de62e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -25,9 +25,7 @@ import org.apache.flink.table.planner.expressions.utils.ScalarTypesTestBase
 import org.junit.Test
 
 import java.sql.Timestamp
-import java.text.SimpleDateFormat
 import java.time.ZoneId
-import java.util.Locale
 
 class ScalarFunctionsTest extends ScalarTypesTestBase {
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
index 4dc61a5..27965f9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
@@ -741,6 +741,20 @@ class TemporalTypesTest extends ExpressionTestBase {
   }
 
   @Test
+  def testDaylightSavingTimeZone(): Unit = {
+    // test from MySQL
+    // https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_unix-timestamp
+    // due to conventions for local time zone changes such as Daylight Saving Time (DST),
+    // it is possible for UNIX_TIMESTAMP() to map two values that are distinct in a non-UTC
+    // time zone to the same Unix timestamp value
+    config.setLocalTimeZone(ZoneId.of("MET")) // Europe/Amsterdam
+
+    testSqlApi("UNIX_TIMESTAMP('2005-03-27 03:00:00')", "1111885200")
+    testSqlApi("UNIX_TIMESTAMP('2005-03-27 02:00:00')", "1111885200")
+    testSqlApi("FROM_UNIXTIME(1111885200)", "2005-03-27 03:00:00")
+  }
+
+  @Test
   def testHourUnitRangoonTimeZone(): Unit = {
     // Asia/Rangoon UTC Offset 6.5
     config.setLocalTimeZone(ZoneId.of("Asia/Rangoon"))
@@ -852,6 +866,33 @@ class TemporalTypesTest extends ExpressionTestBase {
       "70-01-01 09-00-03")
   }
 
+  @Test
+  def testUnixTimestamp(): Unit = {
+    val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3")
+    val ts2 = Timestamp.valueOf("2015-07-25 02:02:02.2")
+    val s1 = "2015/07/24 10:00:00.5"
+    val s2 = "2015/07/25 02:02:02.6"
+    val ss1 = "2015-07-24 10:00:00"
+    val ss2 = "2015-07-25 02:02:02"
+    val fmt = "yyyy/MM/dd HH:mm:ss.S"
+
+    testSqlApi(s"UNIX_TIMESTAMP('$ss1')", (ts1.getTime / 1000L).toString)
+    testSqlApi(s"UNIX_TIMESTAMP('$ss2')", (ts2.getTime / 1000L).toString)
+    testSqlApi(s"UNIX_TIMESTAMP('$s1', '$fmt')", (ts1.getTime / 1000L).toString)
+    testSqlApi(s"UNIX_TIMESTAMP('$s2', '$fmt')", (ts2.getTime / 1000L).toString)
+  }
+
+  @Test
+  def testUnixTimestampInTokyo(): Unit = {
+    config.setLocalTimeZone(ZoneId.of("Asia/Tokyo"))
+    testSqlApi(
+      "UNIX_TIMESTAMP('2015-07-24 10:00:00')",
+      "1437699600")
+    testSqlApi(
+      "UNIX_TIMESTAMP('2015/07/24 10:00:00.5', 'yyyy/MM/dd HH:mm:ss.S')",
+      "1437699600")
+  }
+
   // ----------------------------------------------------------------------------------------------
 
   override def testData: Row = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index 17f1d6f..63499a4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -1149,15 +1149,6 @@ class CalcITCase extends BatchTestBase {
   }
 
   @Test
-  def testUnixTimestamp(): Unit = {
-    checkResult("SELECT" +
-        " UNIX_TIMESTAMP('2017-12-13 19:25:30')," +
-        " UNIX_TIMESTAMP('2017-12-13 19:25:30', 'yyyy-MM-dd HH:mm:ss')" +
-        " FROM testTable WHERE a = TRUE",
-      Seq(row(1513193130, 1513193130)))
-  }
-
-  @Test
   def testToDate(): Unit = {
     checkResult("SELECT" +
         " TO_DATE(CAST(null AS VARCHAR))," +