You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/02/14 10:59:58 UTC
[kyuubi] branch master updated: [KYUUBI #4316] Fix returned Timestamp values may lose precision
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 8fe794709 [KYUUBI #4316] Fix returned Timestamp values may lose precision
8fe794709 is described below
commit 8fe794709bb2a3e28baa6dccf89f62b1dac5c285
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Tue Feb 14 10:54:01 2023 +0000
[KYUUBI #4316] Fix returned Timestamp values may lose precision
### _Why are the changes needed?_
This PR proposes to use `org.apache.spark.sql.execution#toHiveString` to replace `org.apache.kyuubi.engine.spark.schema#toHiveString` to get consistent result w/ `spark-sql` and `STS`.
Because of [SPARK-32006](https://issues.apache.org/jira/browse/SPARK-32006), it only works w/ Spark 3.1 and above.
The patch takes effects on both thrift and arrow result format.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [x] Add screenshots for manual tests if appropriate
```
➜ ~ beeline -u 'jdbc:hive2://0.0.0.0:10009/default'
Connecting to jdbc:hive2://0.0.0.0:10009/default
Connected to: Spark SQL (version 3.3.1)
Driver: Hive JDBC (version 2.3.9)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 2.3.9 by Apache Hive
0: jdbc:hive2://0.0.0.0:10009/default> select to_timestamp('2023-02-08 22:17:33.123456789');
+----------------------------------------------+
| to_timestamp(2023-02-08 22:17:33.123456789) |
+----------------------------------------------+
| 2023-02-08 22:17:33.123456 |
+----------------------------------------------+
1 row selected (0.415 seconds)
```
- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4318 from pan3793/hive-string.
Closes #4316
ba9016f6 [Cheng Pan] nit
8be774b4 [Cheng Pan] nit
bd696fe3 [Cheng Pan] nit
b5cf051c [Cheng Pan] fix
dd6b7021 [Cheng Pan] test
63edd34d [Cheng Pan] nit
37cc70af [Cheng Pan] Fix python ut
c66ad22d [Cheng Pan] [KYUUBI #4316] Fix returned Timestamp values may lose precision
41d94445 [Cheng Pan] Revert "[KYUUBI #3958] Fix Spark session timezone format"
Authored-by: Cheng Pan <ch...@apache.org>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../engine/spark/operation/ExecutePython.scala | 14 ++--
.../engine/spark/operation/SparkOperation.scala | 51 ++++++++------
.../apache/kyuubi/engine/spark/schema/RowSet.scala | 82 +++-------------------
.../spark/sql/kyuubi/SparkDatasetHelper.scala | 10 +--
.../kyuubi/engine/spark/schema/RowSetSuite.scala | 31 ++++----
.../scala/org/apache/kyuubi/util/RowSetUtils.scala | 64 +++--------------
.../kyuubi/operation/SparkDataTypeTests.scala | 35 +++++++--
.../jdbc/hive/arrow/ArrowColumnarBatchRow.java | 16 +++--
.../kyuubi/engine/spark/SparkSqlEngineSuite.scala | 4 +-
9 files changed, 123 insertions(+), 184 deletions(-)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
index e48ff6e5b..d2627fd99 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
@@ -88,9 +88,9 @@ class ExecutePython(
val output = response.map(_.content.getOutput()).getOrElse("")
val ename = response.map(_.content.getEname()).getOrElse("")
val evalue = response.map(_.content.getEvalue()).getOrElse("")
- val traceback = response.map(_.content.getTraceback()).getOrElse(Array.empty)
+ val traceback = response.map(_.content.getTraceback()).getOrElse(Seq.empty)
iter =
- new ArrayFetchIterator[Row](Array(Row(output, status, ename, evalue, Row(traceback: _*))))
+ new ArrayFetchIterator[Row](Array(Row(output, status, ename, evalue, traceback)))
setState(OperationState.FINISHED)
} else {
throw KyuubiSQLException(s"Interpret error:\n$statement\n $response")
@@ -210,7 +210,7 @@ case class SessionPythonWorker(
stdin.flush()
val pythonResponse = Option(stdout.readLine()).map(ExecutePython.fromJson[PythonResponse](_))
// throw exception if internal python code fail
- if (internal && pythonResponse.map(_.content.status) != Some(PythonResponse.OK_STATUS)) {
+ if (internal && !pythonResponse.map(_.content.status).contains(PythonResponse.OK_STATUS)) {
throw KyuubiSQLException(s"Internal python code $code failure: $pythonResponse")
}
pythonResponse
@@ -328,7 +328,7 @@ object ExecutePython extends Logging {
}
// for test
- def defaultSparkHome(): String = {
+ def defaultSparkHome: String = {
val homeDirFilter: FilenameFilter = (dir: File, name: String) =>
dir.isDirectory && name.contains("spark-") && !name.contains("-engine")
// get from kyuubi-server/../externals/kyuubi-download/target
@@ -418,7 +418,7 @@ case class PythonResponseContent(
data: Map[String, String],
ename: String,
evalue: String,
- traceback: Array[String],
+ traceback: Seq[String],
status: String) {
def getOutput(): String = {
Option(data)
@@ -431,7 +431,7 @@ case class PythonResponseContent(
def getEvalue(): String = {
Option(evalue).getOrElse("")
}
- def getTraceback(): Array[String] = {
- Option(traceback).getOrElse(Array.empty)
+ def getTraceback(): Seq[String] = {
+ Option(traceback).getOrElse(Seq.empty)
}
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index b62ef6745..06884534d 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -24,6 +24,7 @@ import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TProgressU
import org.apache.spark.kyuubi.{SparkProgressMonitor, SQLOperationListener}
import org.apache.spark.kyuubi.SparkUtilsHelper.redact
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.{KyuubiSQLException, Utils}
@@ -135,27 +136,35 @@ abstract class SparkOperation(session: Session)
spark.sparkContext.setLocalProperty
protected def withLocalProperties[T](f: => T): T = {
- try {
- spark.sparkContext.setJobGroup(statementId, redactedStatement, forceCancel)
- spark.sparkContext.setLocalProperty(KYUUBI_SESSION_USER_KEY, session.user)
- spark.sparkContext.setLocalProperty(KYUUBI_STATEMENT_ID_KEY, statementId)
- schedulerPool match {
- case Some(pool) =>
- spark.sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL_KEY, pool)
- case None =>
- }
- if (isSessionUserSignEnabled) {
- setSessionUserSign()
- }
+ SQLConf.withExistingConf(spark.sessionState.conf) {
+ val originalSession = SparkSession.getActiveSession
+ try {
+ SparkSession.setActiveSession(spark)
+ spark.sparkContext.setJobGroup(statementId, redactedStatement, forceCancel)
+ spark.sparkContext.setLocalProperty(KYUUBI_SESSION_USER_KEY, session.user)
+ spark.sparkContext.setLocalProperty(KYUUBI_STATEMENT_ID_KEY, statementId)
+ schedulerPool match {
+ case Some(pool) =>
+ spark.sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL_KEY, pool)
+ case None =>
+ }
+ if (isSessionUserSignEnabled) {
+ setSessionUserSign()
+ }
- f
- } finally {
- spark.sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL_KEY, null)
- spark.sparkContext.setLocalProperty(KYUUBI_SESSION_USER_KEY, null)
- spark.sparkContext.setLocalProperty(KYUUBI_STATEMENT_ID_KEY, null)
- spark.sparkContext.clearJobGroup()
- if (isSessionUserSignEnabled) {
- clearSessionUserSign()
+ f
+ } finally {
+ spark.sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL_KEY, null)
+ spark.sparkContext.setLocalProperty(KYUUBI_SESSION_USER_KEY, null)
+ spark.sparkContext.setLocalProperty(KYUUBI_STATEMENT_ID_KEY, null)
+ spark.sparkContext.clearJobGroup()
+ if (isSessionUserSignEnabled) {
+ clearSessionUserSign()
+ }
+ originalSession match {
+ case Some(session) => SparkSession.setActiveSession(session)
+ case None => SparkSession.clearActiveSession()
+ }
}
}
}
@@ -246,7 +255,7 @@ abstract class SparkOperation(session: Session)
} else {
val taken = iter.take(rowSetSize)
RowSet.toTRowSet(
- taken.toList.asInstanceOf[List[Row]],
+ taken.toSeq.asInstanceOf[Seq[Row]],
resultSchema,
getProtocolVersion,
timeZone)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala
index 8cc88156b..7be70403d 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala
@@ -18,22 +18,25 @@
package org.apache.kyuubi.engine.spark.schema
import java.nio.ByteBuffer
-import java.nio.charset.StandardCharsets
-import java.sql.Timestamp
-import java.time._
-import java.util.Date
+import java.time.ZoneId
import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift._
import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.HiveResult
import org.apache.spark.sql.types._
-import org.apache.kyuubi.engine.spark.schema.SchemaHelper.TIMESTAMP_NTZ
import org.apache.kyuubi.util.RowSetUtils._
object RowSet {
+ def toHiveString(valueAndType: (Any, DataType), nested: Boolean = false): String = {
+ // compatible w/ Spark 3.1 and above
+ val timeFormatters = HiveResult.getTimeFormatters
+ HiveResult.toHiveString(valueAndType, nested, timeFormatters)
+ }
+
def toTRowSet(
bytes: Array[Byte],
protocolVersion: TProtocolVersion): TRowSet = {
@@ -68,9 +71,9 @@ object RowSet {
}
def toRowBasedSet(rows: Seq[Row], schema: StructType, timeZone: ZoneId): TRowSet = {
- var i = 0
val rowSize = rows.length
val tRows = new java.util.ArrayList[TRow](rowSize)
+ var i = 0
while (i < rowSize) {
val row = rows(i)
val tRow = new TRow()
@@ -151,13 +154,7 @@ object RowSet {
while (i < rowSize) {
val row = rows(i)
nulls.set(i, row.isNullAt(ordinal))
- val value =
- if (row.isNullAt(ordinal)) {
- ""
- } else {
- toHiveString((row.get(ordinal), typ), timeZone)
- }
- values.add(value)
+ values.add(toHiveString(row.get(ordinal) -> typ))
i += 1
}
TColumn.stringVal(new TStringColumn(values, nulls))
@@ -238,69 +235,12 @@ object RowSet {
case _ =>
val tStrValue = new TStringValue
if (!row.isNullAt(ordinal)) {
- tStrValue.setValue(
- toHiveString((row.get(ordinal), types(ordinal).dataType), timeZone))
+ tStrValue.setValue(toHiveString(row.get(ordinal) -> types(ordinal).dataType))
}
TColumnValue.stringVal(tStrValue)
}
}
- /**
- * A simpler impl of Spark's toHiveString
- */
- def toHiveString(dataWithType: (Any, DataType), timeZone: ZoneId): String = {
- dataWithType match {
- case (null, _) =>
- // Only match nulls in nested type values
- "null"
-
- case (d: Date, DateType) =>
- formatDate(d)
-
- case (ld: LocalDate, DateType) =>
- formatLocalDate(ld)
-
- case (t: Timestamp, TimestampType) =>
- formatTimestamp(t, Option(timeZone))
-
- case (t: LocalDateTime, ntz) if ntz.getClass.getSimpleName.equals(TIMESTAMP_NTZ) =>
- formatLocalDateTime(t)
-
- case (i: Instant, TimestampType) =>
- formatInstant(i, Option(timeZone))
-
- case (bin: Array[Byte], BinaryType) =>
- new String(bin, StandardCharsets.UTF_8)
-
- case (decimal: java.math.BigDecimal, DecimalType()) =>
- decimal.toPlainString
-
- case (s: String, StringType) =>
- // Only match string in nested type values
- "\"" + s + "\""
-
- case (d: Duration, _) => toDayTimeIntervalString(d)
-
- case (p: Period, _) => toYearMonthIntervalString(p)
-
- case (seq: scala.collection.Seq[_], ArrayType(typ, _)) =>
- seq.map(v => (v, typ)).map(e => toHiveString(e, timeZone)).mkString("[", ",", "]")
-
- case (m: Map[_, _], MapType(kType, vType, _)) =>
- m.map { case (key, value) =>
- toHiveString((key, kType), timeZone) + ":" + toHiveString((value, vType), timeZone)
- }.toSeq.sorted.mkString("{", ",", "}")
-
- case (struct: Row, StructType(fields)) =>
- struct.toSeq.zip(fields).map { case (v, t) =>
- s""""${t.name}":${toHiveString((v, t.dataType), timeZone)}"""
- }.mkString("{", ",", "}")
-
- case (other, _) =>
- other.toString
- }
- }
-
private def toTColumn(data: Array[Byte]): TColumn = {
val values = new java.util.ArrayList[ByteBuffer](1)
values.add(ByteBuffer.wrap(data))
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index 23f7df213..46c3bce4d 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -22,7 +22,7 @@ import java.time.ZoneId
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
-import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType}
+import org.apache.spark.sql.types._
import org.apache.kyuubi.engine.spark.schema.RowSet
@@ -41,11 +41,11 @@ object SparkDatasetHelper {
val dt = DataType.fromDDL(schemaDDL)
dt match {
case StructType(Array(StructField(_, st: StructType, _, _))) =>
- RowSet.toHiveString((row, st), timeZone)
+ RowSet.toHiveString((row, st), nested = true)
case StructType(Array(StructField(_, at: ArrayType, _, _))) =>
- RowSet.toHiveString((row.toSeq.head, at), timeZone)
+ RowSet.toHiveString((row.toSeq.head, at), nested = true)
case StructType(Array(StructField(_, mt: MapType, _, _))) =>
- RowSet.toHiveString((row.toSeq.head, mt), timeZone)
+ RowSet.toHiveString((row.toSeq.head, mt), nested = true)
case _ =>
throw new UnsupportedOperationException
}
@@ -54,7 +54,7 @@ object SparkDatasetHelper {
val cols = df.schema.map {
case sf @ StructField(name, _: StructType, _, _) =>
toHiveStringUDF(quotedCol(name), lit(sf.toDDL)).as(name)
- case sf @ StructField(name, (_: MapType | _: ArrayType), _, _) =>
+ case sf @ StructField(name, _: MapType | _: ArrayType, _, _) =>
toHiveStringUDF(struct(quotedCol(name)), lit(sf.toDDL)).as(name)
case StructField(name, _, _, _) => quotedCol(name)
}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala
index 803eea3e6..a999563ea 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.kyuubi.KyuubiFunSuite
-import org.apache.kyuubi.engine.spark.schema.RowSet.toHiveString
class RowSetSuite extends KyuubiFunSuite {
@@ -159,22 +158,22 @@ class RowSetSuite extends KyuubiFunSuite {
val decCol = cols.next().getStringVal
decCol.getValues.asScala.zipWithIndex.foreach {
- case (b, 11) => assert(b.isEmpty)
+ case (b, 11) => assert(b === "NULL")
case (b, i) => assert(b === s"$i.$i")
}
val dateCol = cols.next().getStringVal
dateCol.getValues.asScala.zipWithIndex.foreach {
- case (b, 11) => assert(b.isEmpty)
+ case (b, 11) => assert(b === "NULL")
case (b, i) =>
- assert(b === toHiveString((Date.valueOf(s"2018-11-${i + 1}"), DateType), zoneId))
+ assert(b === RowSet.toHiveString(Date.valueOf(s"2018-11-${i + 1}") -> DateType))
}
val tsCol = cols.next().getStringVal
tsCol.getValues.asScala.zipWithIndex.foreach {
- case (b, 11) => assert(b.isEmpty)
+ case (b, 11) => assert(b === "NULL")
case (b, i) => assert(b ===
- toHiveString((Timestamp.valueOf(s"2018-11-17 13:33:33.$i"), TimestampType), zoneId))
+ RowSet.toHiveString(Timestamp.valueOf(s"2018-11-17 13:33:33.$i") -> TimestampType))
}
val binCol = cols.next().getBinaryVal
@@ -185,23 +184,21 @@ class RowSetSuite extends KyuubiFunSuite {
val arrCol = cols.next().getStringVal
arrCol.getValues.asScala.zipWithIndex.foreach {
- case (b, 11) => assert(b === "")
- case (b, i) => assert(b === toHiveString(
- (Array.fill(i)(java.lang.Double.valueOf(s"$i.$i")).toSeq, ArrayType(DoubleType)),
- zoneId))
+ case (b, 11) => assert(b === "NULL")
+ case (b, i) => assert(b === RowSet.toHiveString(
+ Array.fill(i)(java.lang.Double.valueOf(s"$i.$i")).toSeq -> ArrayType(DoubleType)))
}
val mapCol = cols.next().getStringVal
mapCol.getValues.asScala.zipWithIndex.foreach {
- case (b, 11) => assert(b === "")
- case (b, i) => assert(b === toHiveString(
- (Map(i -> java.lang.Double.valueOf(s"$i.$i")), MapType(IntegerType, DoubleType)),
- zoneId))
+ case (b, 11) => assert(b === "NULL")
+ case (b, i) => assert(b === RowSet.toHiveString(
+ Map(i -> java.lang.Double.valueOf(s"$i.$i")) -> MapType(IntegerType, DoubleType)))
}
val intervalCol = cols.next().getStringVal
intervalCol.getValues.asScala.zipWithIndex.foreach {
- case (b, 11) => assert(b === "")
+ case (b, 11) => assert(b === "NULL")
case (b, i) => assert(b === new CalendarInterval(i, i, i).toString)
}
}
@@ -237,7 +234,7 @@ class RowSetSuite extends KyuubiFunSuite {
assert(r6.get(9).getStringVal.getValue === "2018-11-06")
val r7 = iter.next().getColVals
- assert(r7.get(10).getStringVal.getValue === "2018-11-17 13:33:33.600")
+ assert(r7.get(10).getStringVal.getValue === "2018-11-17 13:33:33.6")
assert(r7.get(11).getStringVal.getValue === new String(
Array.fill[Byte](6)(6.toByte),
StandardCharsets.UTF_8))
@@ -245,7 +242,7 @@ class RowSetSuite extends KyuubiFunSuite {
val r8 = iter.next().getColVals
assert(r8.get(12).getStringVal.getValue === Array.fill(7)(7.7d).mkString("[", ",", "]"))
assert(r8.get(13).getStringVal.getValue ===
- toHiveString((Map(7 -> 7.7d), MapType(IntegerType, DoubleType)), zoneId))
+ RowSet.toHiveString(Map(7 -> 7.7d) -> MapType(IntegerType, DoubleType)))
val r9 = iter.next().getColVals
assert(r9.get(14).getStringVal.getValue === new CalendarInterval(8, 8, 8).toString)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala
index 82417a730..fca79c0f2 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala
@@ -18,14 +18,11 @@
package org.apache.kyuubi.util
import java.nio.ByteBuffer
-import java.sql.Timestamp
-import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period, ZoneId}
+import java.time.{Instant, LocalDate, LocalDateTime, ZoneId}
import java.time.chrono.IsoChronology
-import java.time.format.DateTimeFormatter
import java.time.format.DateTimeFormatterBuilder
import java.time.temporal.ChronoField
-import java.util.{Date, Locale, TimeZone}
-import java.util.concurrent.TimeUnit
+import java.util.{Date, Locale}
import scala.language.implicitConversions
@@ -37,24 +34,18 @@ private[kyuubi] object RowSetUtils {
final private val SECOND_PER_HOUR: Long = SECOND_PER_MINUTE * 60L
final private val SECOND_PER_DAY: Long = SECOND_PER_HOUR * 24L
- private lazy val dateFormatter = {
- createDateTimeFormatterBuilder().appendPattern("yyyy-MM-dd")
- .toFormatter(Locale.US)
- .withChronology(IsoChronology.INSTANCE)
- }
+ private lazy val dateFormatter = createDateTimeFormatterBuilder()
+ .appendPattern("yyyy-MM-dd")
+ .toFormatter(Locale.US)
+ .withChronology(IsoChronology.INSTANCE)
private lazy val legacyDateFormatter = FastDateFormat.getInstance("yyyy-MM-dd", Locale.US)
- private lazy val timestampFormatter: DateTimeFormatter = {
- createDateTimeFormatterBuilder().appendPattern("yyyy-MM-dd HH:mm:ss")
- .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
- .toFormatter(Locale.US)
- .withChronology(IsoChronology.INSTANCE)
- }
-
- private lazy val legacyTimestampFormatter = {
- FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS", Locale.US)
- }
+ private lazy val timestampFormatter = createDateTimeFormatterBuilder()
+ .appendPattern("yyyy-MM-dd HH:mm:ss")
+ .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
+ .toFormatter(Locale.US)
+ .withChronology(IsoChronology.INSTANCE)
private def createDateTimeFormatterBuilder(): DateTimeFormatterBuilder = {
new DateTimeFormatterBuilder().parseCaseInsensitive()
@@ -77,40 +68,7 @@ private[kyuubi] object RowSetUtils {
.getOrElse(timestampFormatter.format(i))
}
- def formatTimestamp(t: Timestamp, timeZone: Option[ZoneId] = None): String = {
- timeZone.map(zoneId => {
- FastDateFormat.getInstance(
- legacyTimestampFormatter.getPattern,
- TimeZone.getTimeZone(zoneId),
- legacyTimestampFormatter.getLocale)
- .format(t)
- }).getOrElse(legacyTimestampFormatter.format(t))
- }
-
implicit def bitSetToBuffer(bitSet: java.util.BitSet): ByteBuffer = {
ByteBuffer.wrap(bitSet.toByteArray)
}
-
- def toDayTimeIntervalString(d: Duration): String = {
- var rest = d.getSeconds
- var sign = ""
- if (d.getSeconds < 0) {
- sign = "-"
- rest = -rest
- }
- val days = TimeUnit.SECONDS.toDays(rest)
- rest %= SECOND_PER_DAY
- val hours = TimeUnit.SECONDS.toHours(rest)
- rest %= SECOND_PER_HOUR
- val minutes = TimeUnit.SECONDS.toMinutes(rest)
- val seconds = rest % SECOND_PER_MINUTE
- f"$sign$days $hours%02d:$minutes%02d:$seconds%02d.${d.getNano}%09d"
- }
-
- def toYearMonthIntervalString(d: Period): String = {
- val years = d.getYears
- val months = d.getMonths
- val sign = if (years < 0 || months < 0) "-" else ""
- s"$sign${Math.abs(years)}-${Math.abs(months)}"
- }
}
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala
index 688167703..3164ae496 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala
@@ -159,9 +159,10 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper {
}
}
- test("execute statement - select timestamp") {
+ test("execute statement - select timestamp - second") {
withJdbcStatement() { statement =>
- val resultSet = statement.executeQuery("SELECT TIMESTAMP '2018-11-17 13:33:33' AS col")
+ val resultSet = statement.executeQuery(
+ "SELECT TIMESTAMP '2018-11-17 13:33:33' AS col")
assert(resultSet.next())
assert(resultSet.getTimestamp("col") === Timestamp.valueOf("2018-11-17 13:33:33"))
val metaData = resultSet.getMetaData
@@ -171,13 +172,39 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper {
}
}
+ test("execute statement - select timestamp - millisecond") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery(
+ "SELECT TIMESTAMP '2018-11-17 13:33:33.12345' AS col")
+ assert(resultSet.next())
+ assert(resultSet.getTimestamp("col") === Timestamp.valueOf("2018-11-17 13:33:33.12345"))
+ val metaData = resultSet.getMetaData
+ assert(metaData.getColumnType(1) === java.sql.Types.TIMESTAMP)
+ assert(metaData.getPrecision(1) === 29)
+ assert(metaData.getScale(1) === 9)
+ }
+ }
+
+ test("execute statement - select timestamp - overflow") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery(
+ "SELECT TIMESTAMP '2018-11-17 13:33:33.1234567' AS col")
+ assert(resultSet.next())
+ assert(resultSet.getTimestamp("col") === Timestamp.valueOf("2018-11-17 13:33:33.123456"))
+ val metaData = resultSet.getMetaData
+ assert(metaData.getColumnType(1) === java.sql.Types.TIMESTAMP)
+ assert(metaData.getPrecision(1) === 29)
+ assert(metaData.getScale(1) === 9)
+ }
+ }
+
test("execute statement - select timestamp_ntz") {
assume(SPARK_ENGINE_VERSION >= "3.4")
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery(
- "SELECT make_timestamp_ntz(2022, 03, 24, 18, 08, 31.800) AS col")
+ "SELECT make_timestamp_ntz(2022, 03, 24, 18, 08, 31.8888) AS col")
assert(resultSet.next())
- assert(resultSet.getTimestamp("col") === Timestamp.valueOf("2022-03-24 18:08:31.800"))
+ assert(resultSet.getTimestamp("col") === Timestamp.valueOf("2022-03-24 18:08:31.8888"))
val metaData = resultSet.getMetaData
assert(metaData.getColumnType(1) === java.sql.Types.TIMESTAMP)
assert(metaData.getPrecision(1) === 29)
diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java
index 20ed55a1d..fa914ce5d 100644
--- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java
+++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java
@@ -105,6 +105,10 @@ public class ArrowColumnarBatchRow {
}
public Object get(int ordinal, TTypeId dataType) {
+ long seconds;
+ long milliseconds;
+ long microseconds;
+ int nanos;
switch (dataType) {
case BOOLEAN_TYPE:
return getBoolean(ordinal);
@@ -127,13 +131,17 @@ public class ArrowColumnarBatchRow {
case STRING_TYPE:
return getString(ordinal);
case TIMESTAMP_TYPE:
- return new Timestamp(getLong(ordinal) / 1000);
+ microseconds = getLong(ordinal);
+ nanos = (int) (microseconds % 1000000) * 1000;
+ Timestamp timestamp = new Timestamp(microseconds / 1000);
+ timestamp.setNanos(nanos);
+ return timestamp;
case DATE_TYPE:
return DateUtils.internalToDate(getInt(ordinal));
case INTERVAL_DAY_TIME_TYPE:
- long microseconds = getLong(ordinal);
- long seconds = microseconds / 1000000;
- int nanos = (int) (microseconds % 1000000) * 1000;
+ microseconds = getLong(ordinal);
+ seconds = microseconds / 1000000;
+ nanos = (int) (microseconds % 1000000) * 1000;
return new HiveIntervalDayTime(seconds, nanos);
case INTERVAL_YEAR_MONTH_TYPE:
return new HiveIntervalYearMonth(getInt(ordinal));
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala
index 1e35d2f1d..9ab627413 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala
@@ -139,13 +139,13 @@ class SparkSqlEngineSuite extends WithKyuubiServer with HiveJDBCTestHelper {
val utcResultSet = statement.executeQuery("select from_utc_timestamp(from_unixtime(" +
"1670404535000/1000,'yyyy-MM-dd HH:mm:ss'),'GMT+08:00')")
assert(utcResultSet.next())
- assert(utcResultSet.getString(1) == "2022-12-07 17:15:35.0")
+ assert(utcResultSet.getString(1) === "2022-12-07 17:15:35.0")
val setGMT8ResultSet = statement.executeQuery("set spark.sql.session.timeZone=GMT+8")
assert(setGMT8ResultSet.next())
val gmt8ResultSet = statement.executeQuery("select from_utc_timestamp(from_unixtime(" +
"1670404535000/1000,'yyyy-MM-dd HH:mm:ss'),'GMT+08:00')")
assert(gmt8ResultSet.next())
- assert(gmt8ResultSet.getString(1) == "2022-12-08 01:15:35.0")
+ assert(gmt8ResultSet.getString(1) === "2022-12-08 01:15:35.0")
}
}