You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/04/25 12:02:10 UTC

[kyuubi] branch master updated: [KYUUBI #4745] Support Flink's LocalZonedTimestamp DataType

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 79d6645fc [KYUUBI #4745] Support Flink's LocalZonedTimestamp DataType
79d6645fc is described below

commit 79d6645fcd65cbbb55bb296827ab5cf33823bfd2
Author: Paul Lin <pa...@gmail.com>
AuthorDate: Tue Apr 25 20:01:59 2023 +0800

    [KYUUBI #4745] Support Flink's LocalZonedTimestamp DataType
    
    ### _Why are the changes needed?_
    
    ### _How was this patch tested?_
    - [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [X] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4751 from link3280/KYUUBI-4745.
    
    Closes #4745
    
    e1e900bbe [Paul Lin] [KYUUBI #4745] Replace hive's timestamp format with the kyuubi's
    0693d1f15 [Paul Lin] [KYUUBI #4745] Pin time zone in tests
    462b39f2f [Paul Lin] [KYUUBI #4745] Improve variable naming
    5f9976d81 [Paul Lin] [KYUUBI #4745] Support Flink's LocalZonedTimestamp DataType
    
    Authored-by: Paul Lin <pa...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../engine/flink/operation/FlinkOperation.scala    |  7 +++
 .../apache/kyuubi/engine/flink/schema/RowSet.scala | 63 ++++++++++++++++++----
 .../flink/operation/FlinkOperationSuite.scala      | 17 ++++++
 .../engine/flink/result/ResultSetSuite.scala       | 11 ++--
 4 files changed, 84 insertions(+), 14 deletions(-)

diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
index eee2fdc98..422ae7d4b 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
@@ -18,6 +18,7 @@
 package org.apache.kyuubi.engine.flink.operation
 
 import java.io.IOException
+import java.time.ZoneId
 
 import scala.collection.JavaConverters.collectionAsScalaIterableConverter
 
@@ -100,9 +101,15 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio
       case FETCH_FIRST => resultSet.getData.fetchAbsolute(0);
     }
     val token = resultSet.getData.take(rowSetSize)
+    val timeZone = Option(flinkSession.getSessionConfig.get("table.local-time-zone"))
+    val zoneId = timeZone match {
+      case Some(tz) => ZoneId.of(tz)
+      case None => ZoneId.systemDefault()
+    }
     val resultRowSet = RowSet.resultSetToTRowSet(
       token.toList,
       resultSet,
+      zoneId,
       getProtocolVersion)
     resultRowSet.setStartRowOffset(resultSet.getData.getPosition)
     resultRowSet
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
index 13cf5e717..c446396d5 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
@@ -21,7 +21,9 @@ import java.{lang, util}
 import java.nio.ByteBuffer
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
-import java.time.{LocalDate, LocalDateTime}
+import java.time.{Instant, LocalDate, LocalDateTime, ZonedDateTime, ZoneId}
+import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, TextStyle}
+import java.time.temporal.ChronoField
 import java.util.Collections
 
 import scala.collection.JavaConverters._
@@ -42,15 +44,16 @@ object RowSet {
   def resultSetToTRowSet(
       rows: Seq[Row],
       resultSet: ResultSet,
+      zoneId: ZoneId,
       protocolVersion: TProtocolVersion): TRowSet = {
     if (protocolVersion.getValue < TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) {
-      toRowBaseSet(rows, resultSet)
+      toRowBaseSet(rows, resultSet, zoneId)
     } else {
-      toColumnBasedSet(rows, resultSet)
+      toColumnBasedSet(rows, resultSet, zoneId)
     }
   }
 
-  def toRowBaseSet(rows: Seq[Row], resultSet: ResultSet): TRowSet = {
+  def toRowBaseSet(rows: Seq[Row], resultSet: ResultSet, zoneId: ZoneId): TRowSet = {
     val rowSize = rows.size
     val tRows = new util.ArrayList[TRow](rowSize)
     var i = 0
@@ -60,7 +63,7 @@ object RowSet {
       val columnSize = row.getArity
       var j = 0
       while (j < columnSize) {
-        val columnValue = toTColumnValue(j, row, resultSet)
+        val columnValue = toTColumnValue(j, row, resultSet, zoneId)
         tRow.addToColVals(columnValue)
         j += 1
       }
@@ -71,14 +74,14 @@ object RowSet {
     new TRowSet(0, tRows)
   }
 
-  def toColumnBasedSet(rows: Seq[Row], resultSet: ResultSet): TRowSet = {
+  def toColumnBasedSet(rows: Seq[Row], resultSet: ResultSet, zoneId: ZoneId): TRowSet = {
     val size = rows.length
     val tRowSet = new TRowSet(0, new util.ArrayList[TRow](size))
     val columnSize = resultSet.getColumns.size()
     var i = 0
     while (i < columnSize) {
       val field = resultSet.getColumns.get(i)
-      val tColumn = toTColumn(rows, i, field.getDataType.getLogicalType)
+      val tColumn = toTColumn(rows, i, field.getDataType.getLogicalType, zoneId)
       tRowSet.addToColumns(tColumn)
       i += 1
     }
@@ -88,7 +91,8 @@ object RowSet {
   private def toTColumnValue(
       ordinal: Int,
       row: Row,
-      resultSet: ResultSet): TColumnValue = {
+      resultSet: ResultSet,
+      zoneId: ZoneId): TColumnValue = {
 
     val column = resultSet.getColumns.get(ordinal)
     val logicalType = column.getDataType.getLogicalType
@@ -153,6 +157,12 @@ object RowSet {
                 s"for type ${t.getClass}.")
         }
         TColumnValue.stringVal(tStringValue)
+      case _: LocalZonedTimestampType =>
+        val tStringValue = new TStringValue
+        val fieldValue = row.getField(ordinal)
+        tStringValue.setValue(TIMESTAMP_LZT_FORMATTER.format(
+          ZonedDateTime.ofInstant(fieldValue.asInstanceOf[Instant], zoneId)))
+        TColumnValue.stringVal(tStringValue)
       case t =>
         val tStringValue = new TStringValue
         if (row.getField(ordinal) != null) {
@@ -166,7 +176,11 @@ object RowSet {
     ByteBuffer.wrap(bitSet.toByteArray)
   }
 
-  private def toTColumn(rows: Seq[Row], ordinal: Int, logicalType: LogicalType): TColumn = {
+  private def toTColumn(
+      rows: Seq[Row],
+      ordinal: Int,
+      logicalType: LogicalType,
+      zoneId: ZoneId): TColumn = {
     val nulls = new java.util.BitSet()
     // for each column, determine the conversion class by sampling the first non-value value
     // if there's no row, set the entire column empty
@@ -211,6 +225,12 @@ object RowSet {
                 s"for type ${t.getClass}.")
         }
         TColumn.stringVal(new TStringColumn(values, nulls))
+      case _: LocalZonedTimestampType =>
+        val values = getOrSetAsNull[Instant](rows, ordinal, nulls, Instant.EPOCH)
+          .toArray().map(v =>
+            TIMESTAMP_LZT_FORMATTER.format(
+              ZonedDateTime.ofInstant(v.asInstanceOf[Instant], zoneId)))
+        TColumn.stringVal(new TStringColumn(values.toList.asJava, nulls))
       case _ =>
         var i = 0
         val rowSize = rows.length
@@ -303,13 +323,14 @@ object RowSet {
     case _: DecimalType => TTypeId.DECIMAL_TYPE
     case _: DateType => TTypeId.DATE_TYPE
     case _: TimestampType => TTypeId.TIMESTAMP_TYPE
+    case _: LocalZonedTimestampType => TTypeId.TIMESTAMPLOCALTZ_TYPE
     case _: ArrayType => TTypeId.ARRAY_TYPE
     case _: MapType => TTypeId.MAP_TYPE
     case _: RowType => TTypeId.STRUCT_TYPE
     case _: BinaryType => TTypeId.BINARY_TYPE
     case _: VarBinaryType => TTypeId.BINARY_TYPE
     case _: TimeType => TTypeId.STRING_TYPE
-    case t @ (_: ZonedTimestampType | _: LocalZonedTimestampType | _: MultisetType |
+    case t @ (_: ZonedTimestampType | _: MultisetType |
         _: YearMonthIntervalType | _: DayTimeIntervalType) =>
       throw new IllegalArgumentException(
         "Flink data type `%s` is not supported currently".format(t.asSummaryString()),
@@ -377,4 +398,26 @@ object RowSet {
         other.toString
     }
   }
+
+  /** should stay in sync with org.apache.kyuubi.jdbc.hive.common.TimestampTZUtil */
+  var TIMESTAMP_LZT_FORMATTER: DateTimeFormatter = {
+    val builder = new DateTimeFormatterBuilder
+    // Date part
+    builder.append(DateTimeFormatter.ofPattern("yyyy-MM-dd"))
+    // Time part
+    builder
+      .optionalStart
+      .appendLiteral(" ")
+      .append(DateTimeFormatter.ofPattern("HH:mm:ss"))
+      .optionalStart
+      .appendFraction(ChronoField.NANO_OF_SECOND, 1, 9, true)
+      .optionalEnd
+      .optionalEnd
+
+    // Zone part
+    builder.optionalStart.appendLiteral(" ").optionalEnd
+    builder.optionalStart.appendZoneText(TextStyle.NARROW).optionalEnd
+
+    builder.toFormatter
+  }
 }
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 908e407a9..00e26c528 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -739,6 +739,23 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
     }
   }
 
+  test("execute statement - select timestamp with local time zone") {
+    withJdbcStatement() { statement =>
+      statement.executeQuery("CREATE VIEW T1 AS SELECT TO_TIMESTAMP_LTZ(4001, 3)")
+      statement.executeQuery("SET 'table.local-time-zone' = 'UTC'")
+      val resultSetUTC = statement.executeQuery("SELECT * FROM T1")
+      val metaData = resultSetUTC.getMetaData
+      assert(metaData.getColumnType(1) === java.sql.Types.OTHER)
+      assert(resultSetUTC.next())
+      assert(resultSetUTC.getString(1) === "1970-01-01 00:00:04.001 UTC")
+
+      statement.executeQuery("SET 'table.local-time-zone' = 'America/Los_Angeles'")
+      val resultSetPST = statement.executeQuery("SELECT * FROM T1")
+      assert(resultSetPST.next())
+      assert(resultSetPST.getString(1) === "1969-12-31 16:00:04.001 America/Los_Angeles")
+    }
+  }
+
   test("execute statement - select time") {
     withJdbcStatement() { statement =>
       val resultSet =
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/result/ResultSetSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/result/ResultSetSuite.scala
index 9190456b3..9ee5c658b 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/result/ResultSetSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/result/ResultSetSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.kyuubi.engine.flink.result
 
+import java.time.ZoneId
+
 import org.apache.flink.table.api.{DataTypes, ResultKind}
 import org.apache.flink.table.catalog.Column
 import org.apache.flink.table.data.StringData
@@ -44,9 +46,10 @@ class ResultSetSuite extends KyuubiFunSuite {
       .data(rowsNew)
       .build
 
-    assert(RowSet.toRowBaseSet(rowsNew, resultSetNew)
-      === RowSet.toRowBaseSet(rowsOld, resultSetOld))
-    assert(RowSet.toColumnBasedSet(rowsNew, resultSetNew)
-      === RowSet.toColumnBasedSet(rowsOld, resultSetOld))
+    val timeZone = ZoneId.of("America/Los_Angeles")
+    assert(RowSet.toRowBaseSet(rowsNew, resultSetNew, timeZone)
+      === RowSet.toRowBaseSet(rowsOld, resultSetOld, timeZone))
+    assert(RowSet.toColumnBasedSet(rowsNew, resultSetNew, timeZone)
+      === RowSet.toColumnBasedSet(rowsOld, resultSetOld, timeZone))
   }
 }