You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/02/14 10:59:58 UTC

[kyuubi] branch master updated: [KYUUBI #4316] Fix returned Timestamp values may lose precision

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fe794709 [KYUUBI #4316] Fix returned Timestamp values may lose precision
8fe794709 is described below

commit 8fe794709bb2a3e28baa6dccf89f62b1dac5c285
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Tue Feb 14 10:54:01 2023 +0000

    [KYUUBI #4316] Fix returned Timestamp values may lose precision
    
    ### _Why are the changes needed?_
    
    This PR proposes to use `org.apache.spark.sql.execution#toHiveString` to replace `org.apache.kyuubi.engine.spark.schema#toHiveString` to get consistent result w/ `spark-sql` and `STS`.
    
    Because of [SPARK-32006](https://issues.apache.org/jira/browse/SPARK-32006), it only works w/ Spark 3.1 and above.
    
    The patch takes effects on both thrift and arrow result format.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [x] Add screenshots for manual tests if appropriate
    ```
    ➜  ~ beeline -u 'jdbc:hive2://0.0.0.0:10009/default'
    Connecting to jdbc:hive2://0.0.0.0:10009/default
    Connected to: Spark SQL (version 3.3.1)
    Driver: Hive JDBC (version 2.3.9)
    Transaction isolation: TRANSACTION_REPEATABLE_READ
    Beeline version 2.3.9 by Apache Hive
    0: jdbc:hive2://0.0.0.0:10009/default> select to_timestamp('2023-02-08 22:17:33.123456789');
    +----------------------------------------------+
    | to_timestamp(2023-02-08 22:17:33.123456789)  |
    +----------------------------------------------+
    | 2023-02-08 22:17:33.123456                   |
    +----------------------------------------------+
    1 row selected (0.415 seconds)
    ```
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4318 from pan3793/hive-string.
    
    Closes #4316
    
    ba9016f6 [Cheng Pan] nit
    8be774b4 [Cheng Pan] nit
    bd696fe3 [Cheng Pan] nit
    b5cf051c [Cheng Pan] fix
    dd6b7021 [Cheng Pan] test
    63edd34d [Cheng Pan] nit
    37cc70af [Cheng Pan] Fix python ut
    c66ad22d [Cheng Pan] [KYUUBI #4316] Fix returned Timestamp values may lose precision
    41d94445 [Cheng Pan] Revert "[KYUUBI #3958] Fix Spark session timezone format"
    
    Authored-by: Cheng Pan <ch...@apache.org>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../engine/spark/operation/ExecutePython.scala     | 14 ++--
 .../engine/spark/operation/SparkOperation.scala    | 51 ++++++++------
 .../apache/kyuubi/engine/spark/schema/RowSet.scala | 82 +++-------------------
 .../spark/sql/kyuubi/SparkDatasetHelper.scala      | 10 +--
 .../kyuubi/engine/spark/schema/RowSetSuite.scala   | 31 ++++----
 .../scala/org/apache/kyuubi/util/RowSetUtils.scala | 64 +++--------------
 .../kyuubi/operation/SparkDataTypeTests.scala      | 35 +++++++--
 .../jdbc/hive/arrow/ArrowColumnarBatchRow.java     | 16 +++--
 .../kyuubi/engine/spark/SparkSqlEngineSuite.scala  |  4 +-
 9 files changed, 123 insertions(+), 184 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
index e48ff6e5b..d2627fd99 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
@@ -88,9 +88,9 @@ class ExecutePython(
         val output = response.map(_.content.getOutput()).getOrElse("")
         val ename = response.map(_.content.getEname()).getOrElse("")
         val evalue = response.map(_.content.getEvalue()).getOrElse("")
-        val traceback = response.map(_.content.getTraceback()).getOrElse(Array.empty)
+        val traceback = response.map(_.content.getTraceback()).getOrElse(Seq.empty)
         iter =
-          new ArrayFetchIterator[Row](Array(Row(output, status, ename, evalue, Row(traceback: _*))))
+          new ArrayFetchIterator[Row](Array(Row(output, status, ename, evalue, traceback)))
         setState(OperationState.FINISHED)
       } else {
         throw KyuubiSQLException(s"Interpret error:\n$statement\n $response")
@@ -210,7 +210,7 @@ case class SessionPythonWorker(
     stdin.flush()
     val pythonResponse = Option(stdout.readLine()).map(ExecutePython.fromJson[PythonResponse](_))
     // throw exception if internal python code fail
-    if (internal && pythonResponse.map(_.content.status) != Some(PythonResponse.OK_STATUS)) {
+    if (internal && !pythonResponse.map(_.content.status).contains(PythonResponse.OK_STATUS)) {
       throw KyuubiSQLException(s"Internal python code $code failure: $pythonResponse")
     }
     pythonResponse
@@ -328,7 +328,7 @@ object ExecutePython extends Logging {
   }
 
   // for test
-  def defaultSparkHome(): String = {
+  def defaultSparkHome: String = {
     val homeDirFilter: FilenameFilter = (dir: File, name: String) =>
       dir.isDirectory && name.contains("spark-") && !name.contains("-engine")
     // get from kyuubi-server/../externals/kyuubi-download/target
@@ -418,7 +418,7 @@ case class PythonResponseContent(
     data: Map[String, String],
     ename: String,
     evalue: String,
-    traceback: Array[String],
+    traceback: Seq[String],
     status: String) {
   def getOutput(): String = {
     Option(data)
@@ -431,7 +431,7 @@ case class PythonResponseContent(
   def getEvalue(): String = {
     Option(evalue).getOrElse("")
   }
-  def getTraceback(): Array[String] = {
-    Option(traceback).getOrElse(Array.empty)
+  def getTraceback(): Seq[String] = {
+    Option(traceback).getOrElse(Seq.empty)
   }
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index b62ef6745..06884534d 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -24,6 +24,7 @@ import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TProgressU
 import org.apache.spark.kyuubi.{SparkProgressMonitor, SQLOperationListener}
 import org.apache.spark.kyuubi.SparkUtilsHelper.redact
 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 
 import org.apache.kyuubi.{KyuubiSQLException, Utils}
@@ -135,27 +136,35 @@ abstract class SparkOperation(session: Session)
     spark.sparkContext.setLocalProperty
 
   protected def withLocalProperties[T](f: => T): T = {
-    try {
-      spark.sparkContext.setJobGroup(statementId, redactedStatement, forceCancel)
-      spark.sparkContext.setLocalProperty(KYUUBI_SESSION_USER_KEY, session.user)
-      spark.sparkContext.setLocalProperty(KYUUBI_STATEMENT_ID_KEY, statementId)
-      schedulerPool match {
-        case Some(pool) =>
-          spark.sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL_KEY, pool)
-        case None =>
-      }
-      if (isSessionUserSignEnabled) {
-        setSessionUserSign()
-      }
+    SQLConf.withExistingConf(spark.sessionState.conf) {
+      val originalSession = SparkSession.getActiveSession
+      try {
+        SparkSession.setActiveSession(spark)
+        spark.sparkContext.setJobGroup(statementId, redactedStatement, forceCancel)
+        spark.sparkContext.setLocalProperty(KYUUBI_SESSION_USER_KEY, session.user)
+        spark.sparkContext.setLocalProperty(KYUUBI_STATEMENT_ID_KEY, statementId)
+        schedulerPool match {
+          case Some(pool) =>
+            spark.sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL_KEY, pool)
+          case None =>
+        }
+        if (isSessionUserSignEnabled) {
+          setSessionUserSign()
+        }
 
-      f
-    } finally {
-      spark.sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL_KEY, null)
-      spark.sparkContext.setLocalProperty(KYUUBI_SESSION_USER_KEY, null)
-      spark.sparkContext.setLocalProperty(KYUUBI_STATEMENT_ID_KEY, null)
-      spark.sparkContext.clearJobGroup()
-      if (isSessionUserSignEnabled) {
-        clearSessionUserSign()
+        f
+      } finally {
+        spark.sparkContext.setLocalProperty(SPARK_SCHEDULER_POOL_KEY, null)
+        spark.sparkContext.setLocalProperty(KYUUBI_SESSION_USER_KEY, null)
+        spark.sparkContext.setLocalProperty(KYUUBI_STATEMENT_ID_KEY, null)
+        spark.sparkContext.clearJobGroup()
+        if (isSessionUserSignEnabled) {
+          clearSessionUserSign()
+        }
+        originalSession match {
+          case Some(session) => SparkSession.setActiveSession(session)
+          case None => SparkSession.clearActiveSession()
+        }
       }
     }
   }
@@ -246,7 +255,7 @@ abstract class SparkOperation(session: Session)
           } else {
             val taken = iter.take(rowSetSize)
             RowSet.toTRowSet(
-              taken.toList.asInstanceOf[List[Row]],
+              taken.toSeq.asInstanceOf[Seq[Row]],
               resultSchema,
               getProtocolVersion,
               timeZone)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala
index 8cc88156b..7be70403d 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala
@@ -18,22 +18,25 @@
 package org.apache.kyuubi.engine.spark.schema
 
 import java.nio.ByteBuffer
-import java.nio.charset.StandardCharsets
-import java.sql.Timestamp
-import java.time._
-import java.util.Date
+import java.time.ZoneId
 
 import scala.collection.JavaConverters._
 
 import org.apache.hive.service.rpc.thrift._
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.HiveResult
 import org.apache.spark.sql.types._
 
-import org.apache.kyuubi.engine.spark.schema.SchemaHelper.TIMESTAMP_NTZ
 import org.apache.kyuubi.util.RowSetUtils._
 
 object RowSet {
 
+  def toHiveString(valueAndType: (Any, DataType), nested: Boolean = false): String = {
+    // compatible w/ Spark 3.1 and above
+    val timeFormatters = HiveResult.getTimeFormatters
+    HiveResult.toHiveString(valueAndType, nested, timeFormatters)
+  }
+
   def toTRowSet(
       bytes: Array[Byte],
       protocolVersion: TProtocolVersion): TRowSet = {
@@ -68,9 +71,9 @@ object RowSet {
   }
 
   def toRowBasedSet(rows: Seq[Row], schema: StructType, timeZone: ZoneId): TRowSet = {
-    var i = 0
     val rowSize = rows.length
     val tRows = new java.util.ArrayList[TRow](rowSize)
+    var i = 0
     while (i < rowSize) {
       val row = rows(i)
       val tRow = new TRow()
@@ -151,13 +154,7 @@ object RowSet {
         while (i < rowSize) {
           val row = rows(i)
           nulls.set(i, row.isNullAt(ordinal))
-          val value =
-            if (row.isNullAt(ordinal)) {
-              ""
-            } else {
-              toHiveString((row.get(ordinal), typ), timeZone)
-            }
-          values.add(value)
+          values.add(toHiveString(row.get(ordinal) -> typ))
           i += 1
         }
         TColumn.stringVal(new TStringColumn(values, nulls))
@@ -238,69 +235,12 @@ object RowSet {
       case _ =>
         val tStrValue = new TStringValue
         if (!row.isNullAt(ordinal)) {
-          tStrValue.setValue(
-            toHiveString((row.get(ordinal), types(ordinal).dataType), timeZone))
+          tStrValue.setValue(toHiveString(row.get(ordinal) -> types(ordinal).dataType))
         }
         TColumnValue.stringVal(tStrValue)
     }
   }
 
-  /**
-   * A simpler impl of Spark's toHiveString
-   */
-  def toHiveString(dataWithType: (Any, DataType), timeZone: ZoneId): String = {
-    dataWithType match {
-      case (null, _) =>
-        // Only match nulls in nested type values
-        "null"
-
-      case (d: Date, DateType) =>
-        formatDate(d)
-
-      case (ld: LocalDate, DateType) =>
-        formatLocalDate(ld)
-
-      case (t: Timestamp, TimestampType) =>
-        formatTimestamp(t, Option(timeZone))
-
-      case (t: LocalDateTime, ntz) if ntz.getClass.getSimpleName.equals(TIMESTAMP_NTZ) =>
-        formatLocalDateTime(t)
-
-      case (i: Instant, TimestampType) =>
-        formatInstant(i, Option(timeZone))
-
-      case (bin: Array[Byte], BinaryType) =>
-        new String(bin, StandardCharsets.UTF_8)
-
-      case (decimal: java.math.BigDecimal, DecimalType()) =>
-        decimal.toPlainString
-
-      case (s: String, StringType) =>
-        // Only match string in nested type values
-        "\"" + s + "\""
-
-      case (d: Duration, _) => toDayTimeIntervalString(d)
-
-      case (p: Period, _) => toYearMonthIntervalString(p)
-
-      case (seq: scala.collection.Seq[_], ArrayType(typ, _)) =>
-        seq.map(v => (v, typ)).map(e => toHiveString(e, timeZone)).mkString("[", ",", "]")
-
-      case (m: Map[_, _], MapType(kType, vType, _)) =>
-        m.map { case (key, value) =>
-          toHiveString((key, kType), timeZone) + ":" + toHiveString((value, vType), timeZone)
-        }.toSeq.sorted.mkString("{", ",", "}")
-
-      case (struct: Row, StructType(fields)) =>
-        struct.toSeq.zip(fields).map { case (v, t) =>
-          s""""${t.name}":${toHiveString((v, t.dataType), timeZone)}"""
-        }.mkString("{", ",", "}")
-
-      case (other, _) =>
-        other.toString
-    }
-  }
-
   private def toTColumn(data: Array[Byte]): TColumn = {
     val values = new java.util.ArrayList[ByteBuffer](1)
     values.add(ByteBuffer.wrap(data))
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index 23f7df213..46c3bce4d 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -22,7 +22,7 @@ import java.time.ZoneId
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Dataset, Row}
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType}
+import org.apache.spark.sql.types._
 
 import org.apache.kyuubi.engine.spark.schema.RowSet
 
@@ -41,11 +41,11 @@ object SparkDatasetHelper {
       val dt = DataType.fromDDL(schemaDDL)
       dt match {
         case StructType(Array(StructField(_, st: StructType, _, _))) =>
-          RowSet.toHiveString((row, st), timeZone)
+          RowSet.toHiveString((row, st), nested = true)
         case StructType(Array(StructField(_, at: ArrayType, _, _))) =>
-          RowSet.toHiveString((row.toSeq.head, at), timeZone)
+          RowSet.toHiveString((row.toSeq.head, at), nested = true)
         case StructType(Array(StructField(_, mt: MapType, _, _))) =>
-          RowSet.toHiveString((row.toSeq.head, mt), timeZone)
+          RowSet.toHiveString((row.toSeq.head, mt), nested = true)
         case _ =>
           throw new UnsupportedOperationException
       }
@@ -54,7 +54,7 @@ object SparkDatasetHelper {
     val cols = df.schema.map {
       case sf @ StructField(name, _: StructType, _, _) =>
         toHiveStringUDF(quotedCol(name), lit(sf.toDDL)).as(name)
-      case sf @ StructField(name, (_: MapType | _: ArrayType), _, _) =>
+      case sf @ StructField(name, _: MapType | _: ArrayType, _, _) =>
         toHiveStringUDF(struct(quotedCol(name)), lit(sf.toDDL)).as(name)
       case StructField(name, _, _, _) => quotedCol(name)
     }
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala
index 803eea3e6..a999563ea 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/schema/RowSetSuite.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.CalendarInterval
 
 import org.apache.kyuubi.KyuubiFunSuite
-import org.apache.kyuubi.engine.spark.schema.RowSet.toHiveString
 
 class RowSetSuite extends KyuubiFunSuite {
 
@@ -159,22 +158,22 @@ class RowSetSuite extends KyuubiFunSuite {
 
     val decCol = cols.next().getStringVal
     decCol.getValues.asScala.zipWithIndex.foreach {
-      case (b, 11) => assert(b.isEmpty)
+      case (b, 11) => assert(b === "NULL")
       case (b, i) => assert(b === s"$i.$i")
     }
 
     val dateCol = cols.next().getStringVal
     dateCol.getValues.asScala.zipWithIndex.foreach {
-      case (b, 11) => assert(b.isEmpty)
+      case (b, 11) => assert(b === "NULL")
       case (b, i) =>
-        assert(b === toHiveString((Date.valueOf(s"2018-11-${i + 1}"), DateType), zoneId))
+        assert(b === RowSet.toHiveString(Date.valueOf(s"2018-11-${i + 1}") -> DateType))
     }
 
     val tsCol = cols.next().getStringVal
     tsCol.getValues.asScala.zipWithIndex.foreach {
-      case (b, 11) => assert(b.isEmpty)
+      case (b, 11) => assert(b === "NULL")
       case (b, i) => assert(b ===
-          toHiveString((Timestamp.valueOf(s"2018-11-17 13:33:33.$i"), TimestampType), zoneId))
+          RowSet.toHiveString(Timestamp.valueOf(s"2018-11-17 13:33:33.$i") -> TimestampType))
     }
 
     val binCol = cols.next().getBinaryVal
@@ -185,23 +184,21 @@ class RowSetSuite extends KyuubiFunSuite {
 
     val arrCol = cols.next().getStringVal
     arrCol.getValues.asScala.zipWithIndex.foreach {
-      case (b, 11) => assert(b === "")
-      case (b, i) => assert(b === toHiveString(
-          (Array.fill(i)(java.lang.Double.valueOf(s"$i.$i")).toSeq, ArrayType(DoubleType)),
-          zoneId))
+      case (b, 11) => assert(b === "NULL")
+      case (b, i) => assert(b === RowSet.toHiveString(
+          Array.fill(i)(java.lang.Double.valueOf(s"$i.$i")).toSeq -> ArrayType(DoubleType)))
     }
 
     val mapCol = cols.next().getStringVal
     mapCol.getValues.asScala.zipWithIndex.foreach {
-      case (b, 11) => assert(b === "")
-      case (b, i) => assert(b === toHiveString(
-          (Map(i -> java.lang.Double.valueOf(s"$i.$i")), MapType(IntegerType, DoubleType)),
-          zoneId))
+      case (b, 11) => assert(b === "NULL")
+      case (b, i) => assert(b === RowSet.toHiveString(
+          Map(i -> java.lang.Double.valueOf(s"$i.$i")) -> MapType(IntegerType, DoubleType)))
     }
 
     val intervalCol = cols.next().getStringVal
     intervalCol.getValues.asScala.zipWithIndex.foreach {
-      case (b, 11) => assert(b === "")
+      case (b, 11) => assert(b === "NULL")
       case (b, i) => assert(b === new CalendarInterval(i, i, i).toString)
     }
   }
@@ -237,7 +234,7 @@ class RowSetSuite extends KyuubiFunSuite {
     assert(r6.get(9).getStringVal.getValue === "2018-11-06")
 
     val r7 = iter.next().getColVals
-    assert(r7.get(10).getStringVal.getValue === "2018-11-17 13:33:33.600")
+    assert(r7.get(10).getStringVal.getValue === "2018-11-17 13:33:33.6")
     assert(r7.get(11).getStringVal.getValue === new String(
       Array.fill[Byte](6)(6.toByte),
       StandardCharsets.UTF_8))
@@ -245,7 +242,7 @@ class RowSetSuite extends KyuubiFunSuite {
     val r8 = iter.next().getColVals
     assert(r8.get(12).getStringVal.getValue === Array.fill(7)(7.7d).mkString("[", ",", "]"))
     assert(r8.get(13).getStringVal.getValue ===
-      toHiveString((Map(7 -> 7.7d), MapType(IntegerType, DoubleType)), zoneId))
+      RowSet.toHiveString(Map(7 -> 7.7d) -> MapType(IntegerType, DoubleType)))
 
     val r9 = iter.next().getColVals
     assert(r9.get(14).getStringVal.getValue === new CalendarInterval(8, 8, 8).toString)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala
index 82417a730..fca79c0f2 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/RowSetUtils.scala
@@ -18,14 +18,11 @@
 package org.apache.kyuubi.util
 
 import java.nio.ByteBuffer
-import java.sql.Timestamp
-import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period, ZoneId}
+import java.time.{Instant, LocalDate, LocalDateTime, ZoneId}
 import java.time.chrono.IsoChronology
-import java.time.format.DateTimeFormatter
 import java.time.format.DateTimeFormatterBuilder
 import java.time.temporal.ChronoField
-import java.util.{Date, Locale, TimeZone}
-import java.util.concurrent.TimeUnit
+import java.util.{Date, Locale}
 
 import scala.language.implicitConversions
 
@@ -37,24 +34,18 @@ private[kyuubi] object RowSetUtils {
   final private val SECOND_PER_HOUR: Long = SECOND_PER_MINUTE * 60L
   final private val SECOND_PER_DAY: Long = SECOND_PER_HOUR * 24L
 
-  private lazy val dateFormatter = {
-    createDateTimeFormatterBuilder().appendPattern("yyyy-MM-dd")
-      .toFormatter(Locale.US)
-      .withChronology(IsoChronology.INSTANCE)
-  }
+  private lazy val dateFormatter = createDateTimeFormatterBuilder()
+    .appendPattern("yyyy-MM-dd")
+    .toFormatter(Locale.US)
+    .withChronology(IsoChronology.INSTANCE)
 
   private lazy val legacyDateFormatter = FastDateFormat.getInstance("yyyy-MM-dd", Locale.US)
 
-  private lazy val timestampFormatter: DateTimeFormatter = {
-    createDateTimeFormatterBuilder().appendPattern("yyyy-MM-dd HH:mm:ss")
-      .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
-      .toFormatter(Locale.US)
-      .withChronology(IsoChronology.INSTANCE)
-  }
-
-  private lazy val legacyTimestampFormatter = {
-    FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS", Locale.US)
-  }
+  private lazy val timestampFormatter = createDateTimeFormatterBuilder()
+    .appendPattern("yyyy-MM-dd HH:mm:ss")
+    .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
+    .toFormatter(Locale.US)
+    .withChronology(IsoChronology.INSTANCE)
 
   private def createDateTimeFormatterBuilder(): DateTimeFormatterBuilder = {
     new DateTimeFormatterBuilder().parseCaseInsensitive()
@@ -77,40 +68,7 @@ private[kyuubi] object RowSetUtils {
       .getOrElse(timestampFormatter.format(i))
   }
 
-  def formatTimestamp(t: Timestamp, timeZone: Option[ZoneId] = None): String = {
-    timeZone.map(zoneId => {
-      FastDateFormat.getInstance(
-        legacyTimestampFormatter.getPattern,
-        TimeZone.getTimeZone(zoneId),
-        legacyTimestampFormatter.getLocale)
-        .format(t)
-    }).getOrElse(legacyTimestampFormatter.format(t))
-  }
-
   implicit def bitSetToBuffer(bitSet: java.util.BitSet): ByteBuffer = {
     ByteBuffer.wrap(bitSet.toByteArray)
   }
-
-  def toDayTimeIntervalString(d: Duration): String = {
-    var rest = d.getSeconds
-    var sign = ""
-    if (d.getSeconds < 0) {
-      sign = "-"
-      rest = -rest
-    }
-    val days = TimeUnit.SECONDS.toDays(rest)
-    rest %= SECOND_PER_DAY
-    val hours = TimeUnit.SECONDS.toHours(rest)
-    rest %= SECOND_PER_HOUR
-    val minutes = TimeUnit.SECONDS.toMinutes(rest)
-    val seconds = rest % SECOND_PER_MINUTE
-    f"$sign$days $hours%02d:$minutes%02d:$seconds%02d.${d.getNano}%09d"
-  }
-
-  def toYearMonthIntervalString(d: Period): String = {
-    val years = d.getYears
-    val months = d.getMonths
-    val sign = if (years < 0 || months < 0) "-" else ""
-    s"$sign${Math.abs(years)}-${Math.abs(months)}"
-  }
 }
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala
index 688167703..3164ae496 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala
@@ -159,9 +159,10 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper {
     }
   }
 
-  test("execute statement - select timestamp") {
+  test("execute statement - select timestamp - second") {
     withJdbcStatement() { statement =>
-      val resultSet = statement.executeQuery("SELECT TIMESTAMP '2018-11-17 13:33:33' AS col")
+      val resultSet = statement.executeQuery(
+        "SELECT TIMESTAMP '2018-11-17 13:33:33' AS col")
       assert(resultSet.next())
       assert(resultSet.getTimestamp("col") === Timestamp.valueOf("2018-11-17 13:33:33"))
       val metaData = resultSet.getMetaData
@@ -171,13 +172,39 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper {
     }
   }
 
+  test("execute statement - select timestamp - millisecond") {
+    withJdbcStatement() { statement =>
+      val resultSet = statement.executeQuery(
+        "SELECT TIMESTAMP '2018-11-17 13:33:33.12345' AS col")
+      assert(resultSet.next())
+      assert(resultSet.getTimestamp("col") === Timestamp.valueOf("2018-11-17 13:33:33.12345"))
+      val metaData = resultSet.getMetaData
+      assert(metaData.getColumnType(1) === java.sql.Types.TIMESTAMP)
+      assert(metaData.getPrecision(1) === 29)
+      assert(metaData.getScale(1) === 9)
+    }
+  }
+
+  test("execute statement - select timestamp - overflow") {
+    withJdbcStatement() { statement =>
+      val resultSet = statement.executeQuery(
+        "SELECT TIMESTAMP '2018-11-17 13:33:33.1234567' AS col")
+      assert(resultSet.next())
+      assert(resultSet.getTimestamp("col") === Timestamp.valueOf("2018-11-17 13:33:33.123456"))
+      val metaData = resultSet.getMetaData
+      assert(metaData.getColumnType(1) === java.sql.Types.TIMESTAMP)
+      assert(metaData.getPrecision(1) === 29)
+      assert(metaData.getScale(1) === 9)
+    }
+  }
+
   test("execute statement - select timestamp_ntz") {
     assume(SPARK_ENGINE_VERSION >= "3.4")
     withJdbcStatement() { statement =>
       val resultSet = statement.executeQuery(
-        "SELECT make_timestamp_ntz(2022, 03, 24, 18, 08, 31.800) AS col")
+        "SELECT make_timestamp_ntz(2022, 03, 24, 18, 08, 31.8888) AS col")
       assert(resultSet.next())
-      assert(resultSet.getTimestamp("col") === Timestamp.valueOf("2022-03-24 18:08:31.800"))
+      assert(resultSet.getTimestamp("col") === Timestamp.valueOf("2022-03-24 18:08:31.8888"))
       val metaData = resultSet.getMetaData
       assert(metaData.getColumnType(1) === java.sql.Types.TIMESTAMP)
       assert(metaData.getPrecision(1) === 29)
diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java
index 20ed55a1d..fa914ce5d 100644
--- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java
+++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java
@@ -105,6 +105,10 @@ public class ArrowColumnarBatchRow {
   }
 
   public Object get(int ordinal, TTypeId dataType) {
+    long seconds;
+    long milliseconds;
+    long microseconds;
+    int nanos;
     switch (dataType) {
       case BOOLEAN_TYPE:
         return getBoolean(ordinal);
@@ -127,13 +131,17 @@ public class ArrowColumnarBatchRow {
       case STRING_TYPE:
         return getString(ordinal);
       case TIMESTAMP_TYPE:
-        return new Timestamp(getLong(ordinal) / 1000);
+        microseconds = getLong(ordinal);
+        nanos = (int) (microseconds % 1000000) * 1000;
+        Timestamp timestamp = new Timestamp(microseconds / 1000);
+        timestamp.setNanos(nanos);
+        return timestamp;
       case DATE_TYPE:
         return DateUtils.internalToDate(getInt(ordinal));
       case INTERVAL_DAY_TIME_TYPE:
-        long microseconds = getLong(ordinal);
-        long seconds = microseconds / 1000000;
-        int nanos = (int) (microseconds % 1000000) * 1000;
+        microseconds = getLong(ordinal);
+        seconds = microseconds / 1000000;
+        nanos = (int) (microseconds % 1000000) * 1000;
         return new HiveIntervalDayTime(seconds, nanos);
       case INTERVAL_YEAR_MONTH_TYPE:
         return new HiveIntervalYearMonth(getInt(ordinal));
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala
index 1e35d2f1d..9ab627413 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala
@@ -139,13 +139,13 @@ class SparkSqlEngineSuite extends WithKyuubiServer with HiveJDBCTestHelper {
       val utcResultSet = statement.executeQuery("select from_utc_timestamp(from_unixtime(" +
         "1670404535000/1000,'yyyy-MM-dd HH:mm:ss'),'GMT+08:00')")
       assert(utcResultSet.next())
-      assert(utcResultSet.getString(1) == "2022-12-07 17:15:35.0")
+      assert(utcResultSet.getString(1) === "2022-12-07 17:15:35.0")
       val setGMT8ResultSet = statement.executeQuery("set spark.sql.session.timeZone=GMT+8")
       assert(setGMT8ResultSet.next())
       val gmt8ResultSet = statement.executeQuery("select from_utc_timestamp(from_unixtime(" +
         "1670404535000/1000,'yyyy-MM-dd HH:mm:ss'),'GMT+08:00')")
       assert(gmt8ResultSet.next())
-      assert(gmt8ResultSet.getString(1) == "2022-12-08 01:15:35.0")
+      assert(gmt8ResultSet.getString(1) === "2022-12-08 01:15:35.0")
     }
   }