You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/04/25 12:02:10 UTC
[kyuubi] branch master updated: [KYUUBI #4745] Support Flink's LocalZonedTimestamp DataType
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 79d6645fc [KYUUBI #4745] Support Flink's LocalZonedTimestamp DataType
79d6645fc is described below
commit 79d6645fcd65cbbb55bb296827ab5cf33823bfd2
Author: Paul Lin <pa...@gmail.com>
AuthorDate: Tue Apr 25 20:01:59 2023 +0800
[KYUUBI #4745] Support Flink's LocalZonedTimestamp DataType
### _Why are the changes needed?_
### _How was this patch tested?_
- [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [X] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4751 from link3280/KYUUBI-4745.
Closes #4745
e1e900bbe [Paul Lin] [KYUUBI #4745] Replace hive's timestamp format with the kyuubi's
0693d1f15 [Paul Lin] [KYUUBI #4745] Pin time zone in tests
462b39f2f [Paul Lin] [KYUUBI #4745] Improve variable naming
5f9976d81 [Paul Lin] [KYUUBI #4745] Support Flink's LocalZonedTimestamp DataType
Authored-by: Paul Lin <pa...@gmail.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../engine/flink/operation/FlinkOperation.scala | 7 +++
.../apache/kyuubi/engine/flink/schema/RowSet.scala | 63 ++++++++++++++++++----
.../flink/operation/FlinkOperationSuite.scala | 17 ++++++
.../engine/flink/result/ResultSetSuite.scala | 11 ++--
4 files changed, 84 insertions(+), 14 deletions(-)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
index eee2fdc98..422ae7d4b 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
@@ -18,6 +18,7 @@
package org.apache.kyuubi.engine.flink.operation
import java.io.IOException
+import java.time.ZoneId
import scala.collection.JavaConverters.collectionAsScalaIterableConverter
@@ -100,9 +101,15 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio
case FETCH_FIRST => resultSet.getData.fetchAbsolute(0);
}
val token = resultSet.getData.take(rowSetSize)
+ val timeZone = Option(flinkSession.getSessionConfig.get("table.local-time-zone"))
+ val zoneId = timeZone match {
+ case Some(tz) => ZoneId.of(tz)
+ case None => ZoneId.systemDefault()
+ }
val resultRowSet = RowSet.resultSetToTRowSet(
token.toList,
resultSet,
+ zoneId,
getProtocolVersion)
resultRowSet.setStartRowOffset(resultSet.getData.getPosition)
resultRowSet
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
index 13cf5e717..c446396d5 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
@@ -21,7 +21,9 @@ import java.{lang, util}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
-import java.time.{LocalDate, LocalDateTime}
+import java.time.{Instant, LocalDate, LocalDateTime, ZonedDateTime, ZoneId}
+import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, TextStyle}
+import java.time.temporal.ChronoField
import java.util.Collections
import scala.collection.JavaConverters._
@@ -42,15 +44,16 @@ object RowSet {
def resultSetToTRowSet(
rows: Seq[Row],
resultSet: ResultSet,
+ zoneId: ZoneId,
protocolVersion: TProtocolVersion): TRowSet = {
if (protocolVersion.getValue < TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) {
- toRowBaseSet(rows, resultSet)
+ toRowBaseSet(rows, resultSet, zoneId)
} else {
- toColumnBasedSet(rows, resultSet)
+ toColumnBasedSet(rows, resultSet, zoneId)
}
}
- def toRowBaseSet(rows: Seq[Row], resultSet: ResultSet): TRowSet = {
+ def toRowBaseSet(rows: Seq[Row], resultSet: ResultSet, zoneId: ZoneId): TRowSet = {
val rowSize = rows.size
val tRows = new util.ArrayList[TRow](rowSize)
var i = 0
@@ -60,7 +63,7 @@ object RowSet {
val columnSize = row.getArity
var j = 0
while (j < columnSize) {
- val columnValue = toTColumnValue(j, row, resultSet)
+ val columnValue = toTColumnValue(j, row, resultSet, zoneId)
tRow.addToColVals(columnValue)
j += 1
}
@@ -71,14 +74,14 @@ object RowSet {
new TRowSet(0, tRows)
}
- def toColumnBasedSet(rows: Seq[Row], resultSet: ResultSet): TRowSet = {
+ def toColumnBasedSet(rows: Seq[Row], resultSet: ResultSet, zoneId: ZoneId): TRowSet = {
val size = rows.length
val tRowSet = new TRowSet(0, new util.ArrayList[TRow](size))
val columnSize = resultSet.getColumns.size()
var i = 0
while (i < columnSize) {
val field = resultSet.getColumns.get(i)
- val tColumn = toTColumn(rows, i, field.getDataType.getLogicalType)
+ val tColumn = toTColumn(rows, i, field.getDataType.getLogicalType, zoneId)
tRowSet.addToColumns(tColumn)
i += 1
}
@@ -88,7 +91,8 @@ object RowSet {
private def toTColumnValue(
ordinal: Int,
row: Row,
- resultSet: ResultSet): TColumnValue = {
+ resultSet: ResultSet,
+ zoneId: ZoneId): TColumnValue = {
val column = resultSet.getColumns.get(ordinal)
val logicalType = column.getDataType.getLogicalType
@@ -153,6 +157,12 @@ object RowSet {
s"for type ${t.getClass}.")
}
TColumnValue.stringVal(tStringValue)
+ case _: LocalZonedTimestampType =>
+ val tStringValue = new TStringValue
+ val fieldValue = row.getField(ordinal)
+ tStringValue.setValue(TIMESTAMP_LZT_FORMATTER.format(
+ ZonedDateTime.ofInstant(fieldValue.asInstanceOf[Instant], zoneId)))
+ TColumnValue.stringVal(tStringValue)
case t =>
val tStringValue = new TStringValue
if (row.getField(ordinal) != null) {
@@ -166,7 +176,11 @@ object RowSet {
ByteBuffer.wrap(bitSet.toByteArray)
}
- private def toTColumn(rows: Seq[Row], ordinal: Int, logicalType: LogicalType): TColumn = {
+ private def toTColumn(
+ rows: Seq[Row],
+ ordinal: Int,
+ logicalType: LogicalType,
+ zoneId: ZoneId): TColumn = {
val nulls = new java.util.BitSet()
// for each column, determine the conversion class by sampling the first non-value value
// if there's no row, set the entire column empty
@@ -211,6 +225,12 @@ object RowSet {
s"for type ${t.getClass}.")
}
TColumn.stringVal(new TStringColumn(values, nulls))
+ case _: LocalZonedTimestampType =>
+ val values = getOrSetAsNull[Instant](rows, ordinal, nulls, Instant.EPOCH)
+ .toArray().map(v =>
+ TIMESTAMP_LZT_FORMATTER.format(
+ ZonedDateTime.ofInstant(v.asInstanceOf[Instant], zoneId)))
+ TColumn.stringVal(new TStringColumn(values.toList.asJava, nulls))
case _ =>
var i = 0
val rowSize = rows.length
@@ -303,13 +323,14 @@ object RowSet {
case _: DecimalType => TTypeId.DECIMAL_TYPE
case _: DateType => TTypeId.DATE_TYPE
case _: TimestampType => TTypeId.TIMESTAMP_TYPE
+ case _: LocalZonedTimestampType => TTypeId.TIMESTAMPLOCALTZ_TYPE
case _: ArrayType => TTypeId.ARRAY_TYPE
case _: MapType => TTypeId.MAP_TYPE
case _: RowType => TTypeId.STRUCT_TYPE
case _: BinaryType => TTypeId.BINARY_TYPE
case _: VarBinaryType => TTypeId.BINARY_TYPE
case _: TimeType => TTypeId.STRING_TYPE
- case t @ (_: ZonedTimestampType | _: LocalZonedTimestampType | _: MultisetType |
+ case t @ (_: ZonedTimestampType | _: MultisetType |
_: YearMonthIntervalType | _: DayTimeIntervalType) =>
throw new IllegalArgumentException(
"Flink data type `%s` is not supported currently".format(t.asSummaryString()),
@@ -377,4 +398,26 @@ object RowSet {
other.toString
}
}
+
+ /** should stay in sync with org.apache.kyuubi.jdbc.hive.common.TimestampTZUtil */
+ var TIMESTAMP_LZT_FORMATTER: DateTimeFormatter = {
+ val builder = new DateTimeFormatterBuilder
+ // Date part
+ builder.append(DateTimeFormatter.ofPattern("yyyy-MM-dd"))
+ // Time part
+ builder
+ .optionalStart
+ .appendLiteral(" ")
+ .append(DateTimeFormatter.ofPattern("HH:mm:ss"))
+ .optionalStart
+ .appendFraction(ChronoField.NANO_OF_SECOND, 1, 9, true)
+ .optionalEnd
+ .optionalEnd
+
+ // Zone part
+ builder.optionalStart.appendLiteral(" ").optionalEnd
+ builder.optionalStart.appendZoneText(TextStyle.NARROW).optionalEnd
+
+ builder.toFormatter
+ }
}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 908e407a9..00e26c528 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -739,6 +739,23 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
}
}
+ test("execute statement - select timestamp with local time zone") {
+ withJdbcStatement() { statement =>
+ statement.executeQuery("CREATE VIEW T1 AS SELECT TO_TIMESTAMP_LTZ(4001, 3)")
+ statement.executeQuery("SET 'table.local-time-zone' = 'UTC'")
+ val resultSetUTC = statement.executeQuery("SELECT * FROM T1")
+ val metaData = resultSetUTC.getMetaData
+ assert(metaData.getColumnType(1) === java.sql.Types.OTHER)
+ assert(resultSetUTC.next())
+ assert(resultSetUTC.getString(1) === "1970-01-01 00:00:04.001 UTC")
+
+ statement.executeQuery("SET 'table.local-time-zone' = 'America/Los_Angeles'")
+ val resultSetPST = statement.executeQuery("SELECT * FROM T1")
+ assert(resultSetPST.next())
+ assert(resultSetPST.getString(1) === "1969-12-31 16:00:04.001 America/Los_Angeles")
+ }
+ }
+
test("execute statement - select time") {
withJdbcStatement() { statement =>
val resultSet =
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/result/ResultSetSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/result/ResultSetSuite.scala
index 9190456b3..9ee5c658b 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/result/ResultSetSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/result/ResultSetSuite.scala
@@ -17,6 +17,8 @@
package org.apache.kyuubi.engine.flink.result
+import java.time.ZoneId
+
import org.apache.flink.table.api.{DataTypes, ResultKind}
import org.apache.flink.table.catalog.Column
import org.apache.flink.table.data.StringData
@@ -44,9 +46,10 @@ class ResultSetSuite extends KyuubiFunSuite {
.data(rowsNew)
.build
- assert(RowSet.toRowBaseSet(rowsNew, resultSetNew)
- === RowSet.toRowBaseSet(rowsOld, resultSetOld))
- assert(RowSet.toColumnBasedSet(rowsNew, resultSetNew)
- === RowSet.toColumnBasedSet(rowsOld, resultSetOld))
+ val timeZone = ZoneId.of("America/Los_Angeles")
+ assert(RowSet.toRowBaseSet(rowsNew, resultSetNew, timeZone)
+ === RowSet.toRowBaseSet(rowsOld, resultSetOld, timeZone))
+ assert(RowSet.toColumnBasedSet(rowsNew, resultSetNew, timeZone)
+ === RowSet.toColumnBasedSet(rowsOld, resultSetOld, timeZone))
}
}