You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/02/18 17:10:37 UTC

[kyuubi] branch branch-1.7 updated: [KYUUBI #4326] [ARROW] Fix Spark session timezone format in arrow-based result format

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new e75f91df9 [KYUUBI #4326] [ARROW] Fix Spark session timezone format in arrow-based result format
e75f91df9 is described below

commit e75f91df9167ae458b343f974df12c6b99992684
Author: Fu Chen <cf...@gmail.com>
AuthorDate: Sun Feb 19 01:10:04 2023 +0800

    [KYUUBI #4326] [ARROW] Fix Spark session timezone format in arrow-based result format
    
    ### _Why are the changes needed?_
    
    1. this PR introduces a new configuration called `kyuubi.operation.result.arrow.timestampAsString`, when true, arrow-based rowsets will convert timestamp-type columns to strings for transmission.
    
    2. `kyuubi.operation.result.arrow.timestampAsString` default setting to false for better transmission performance
    
    3. the PR fixes timezone issue in arrow based result format described in #3958
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4326 from cfmcgrady/arrow-string-ts.
    
    Closes #4326
    
    38c7fc9b [Fu Chen] fix style
    d864db00 [Fu Chen] address comment
    b714b3ee [Fu Chen] revert externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala
    6c4eb507 [Fu Chen] minor
    289b6007 [Fu Chen] timstampAsString = false by default
    78b7caba [Fu Chen] fix
    f5601356 [Fu Chen] debug info
    b8e4b288 [Fu Chen] fix ut
    87c6f9ef [Fu Chen] update docs
    86f6cb73 [Fu Chen] arrow based rowset timestamp as string
    
    Authored-by: Fu Chen <cf...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
    (cherry picked from commit 6bd0016fe271d0e5abfcd558e9204cfdc8641978)
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 docs/deployment/settings.md                        | 33 ++++++-------
 .../engine/spark/operation/ExecuteStatement.scala  | 13 ++---
 .../engine/spark/operation/SparkOperation.scala    |  8 +++-
 .../spark/sql/kyuubi/SparkDatasetHelper.scala      | 13 +++--
 .../operation/SparkArrowbasedOperationSuite.scala  | 56 +++++++++++++++++++++-
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  8 ++++
 .../kyuubi/jdbc/hive/JdbcColumnAttributes.java     |  2 +-
 .../jdbc/hive/KyuubiArrowBasedResultSet.java       | 14 ++++--
 .../jdbc/hive/KyuubiArrowQueryResultSet.java       | 54 +++++++++++----------
 .../apache/kyuubi/jdbc/hive/KyuubiStatement.java   | 16 ++++++-
 .../jdbc/hive/arrow/ArrowColumnarBatchRow.java     | 20 ++++----
 11 files changed, 167 insertions(+), 70 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 8d0e32436..1e0567860 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -429,22 +429,23 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
 
 ### Operation
 
-|                   Key                   |                                     Default                                     |                                                                                                                                                                                                                                                 Meaning                                                                                                                        [...]
-|-----------------------------------------|---------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
-| kyuubi.operation.idle.timeout           | PT3H                                                                            | Operation will be closed when it's not accessed for this duration of time                                                                                                                                                                                                                                                                                                      [...]
-| kyuubi.operation.interrupt.on.cancel    | true                                                                            | When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished.                                                                                                                                                                                                                                             [...]
-| kyuubi.operation.language               | SQL                                                                             | Choose a programing language for the following inputs<ul><li>SQL: (Default) Run all following statements as SQL queries.</li><li>SCALA: Run all following input as scala codes</li><li>PYTHON: (Experimental) Run all following input as Python codes with Spark engine</li></ul>                                                                                              [...]
-| kyuubi.operation.log.dir.root           | server_operation_logs                                                           | Root directory for query operation log at server-side.                                                                                                                                                                                                                                                                                                                         [...]
-| kyuubi.operation.plan.only.excludes     | ResetCommand,SetCommand,SetNamespaceCommand,UseStatement,SetCatalogAndNamespace | Comma-separated list of query plan names, in the form of simple class names, i.e, for `SET abc=xyz`, the value will be `SetCommand`. For those auxiliary plans, such as `switch databases`, `set properties`, or `create temporary view` etc., which are used for setup evaluating environments for analyzing actual queries, we can use this config to exclude them and let t [...]
-| kyuubi.operation.plan.only.mode         | none                                                                            | Configures the statement performed mode, The value can be 'parse', 'analyze', 'optimize', 'optimize_with_stats', 'physical', 'execution', or 'none', when it is 'none', indicate to the statement will be fully executed, otherwise only way without executing the query. different engines currently support different modes, the Spark engine supports all modes, and the Fl [...]
-| kyuubi.operation.plan.only.output.style | plain                                                                           | Configures the planOnly output style. The value can be 'plain' or 'json', and the default value is 'plain'. This configuration supports only the output styles of the Spark engine                                                                                                                                                                                             [...]
-| kyuubi.operation.progress.enabled       | false                                                                           | Whether to enable the operation progress. When true, the operation progress will be returned in `GetOperationStatus`.                                                                                                                                                                                                                                                          [...]
-| kyuubi.operation.query.timeout          | &lt;undefined&gt;                                                               | Timeout for query executions at server-side, take effect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take full control of whether the query should timeout or not. If set, client-side timeout is capped at this point. To cancel t [...]
-| kyuubi.operation.result.format          | thrift                                                                          | Specify the result format, available configs are: <ul> <li>THRIFT: the result will convert to TRow at the engine driver side. </li> <li>ARROW: the result will be encoded as Arrow at the executor side before collecting by the driver, and deserialized at the client side. note that it only takes effect for kyuubi-hive-jdbc clients now.</li></ul>                       [...]
-| kyuubi.operation.result.max.rows        | 0                                                                               | Max rows of Spark query results. Rows exceeding the limit would be ignored. By setting this value to 0 to disable the max rows limit.                                                                                                                                                                                                                                          [...]
-| kyuubi.operation.scheduler.pool         | &lt;undefined&gt;                                                               | The scheduler pool of job. Note that, this config should be used after changing Spark config spark.scheduler.mode=FAIR.                                                                                                                                                                                                                                                        [...]
-| kyuubi.operation.spark.listener.enabled | true                                                                            | When set to true, Spark engine registers an SQLOperationListener before executing the statement, logging a few summary statistics when each stage completes.                                                                                                                                                                                                                   [...]
-| kyuubi.operation.status.polling.timeout | PT5S                                                                            | Timeout(ms) for long polling asynchronous running sql query's status                                                                                                                                                                                                                                                                                                           [...]
+|                       Key                       |                                     Default                                     |                                                                                                                                                                                                                                                 Meaning                                                                                                                [...]
+|-------------------------------------------------|---------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
+| kyuubi.operation.idle.timeout                   | PT3H                                                                            | Operation will be closed when it's not accessed for this duration of time                                                                                                                                                                                                                                                                                              [...]
+| kyuubi.operation.interrupt.on.cancel            | true                                                                            | When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished.                                                                                                                                                                                                                                     [...]
+| kyuubi.operation.language                       | SQL                                                                             | Choose a programing language for the following inputs<ul><li>SQL: (Default) Run all following statements as SQL queries.</li><li>SCALA: Run all following input as scala codes</li><li>PYTHON: (Experimental) Run all following input as Python codes with Spark engine</li></ul>                                                                                      [...]
+| kyuubi.operation.log.dir.root                   | server_operation_logs                                                           | Root directory for query operation log at server-side.                                                                                                                                                                                                                                                                                                                 [...]
+| kyuubi.operation.plan.only.excludes             | ResetCommand,SetCommand,SetNamespaceCommand,UseStatement,SetCatalogAndNamespace | Comma-separated list of query plan names, in the form of simple class names, i.e, for `SET abc=xyz`, the value will be `SetCommand`. For those auxiliary plans, such as `switch databases`, `set properties`, or `create temporary view` etc., which are used for setup evaluating environments for analyzing actual queries, we can use this config to exclude them a [...]
+| kyuubi.operation.plan.only.mode                 | none                                                                            | Configures the statement performed mode, The value can be 'parse', 'analyze', 'optimize', 'optimize_with_stats', 'physical', 'execution', or 'none', when it is 'none', indicate to the statement will be fully executed, otherwise only way without executing the query. different engines currently support different modes, the Spark engine supports all modes, an [...]
+| kyuubi.operation.plan.only.output.style         | plain                                                                           | Configures the planOnly output style. The value can be 'plain' or 'json', and the default value is 'plain'. This configuration supports only the output styles of the Spark engine                                                                                                                                                                                     [...]
+| kyuubi.operation.progress.enabled               | false                                                                           | Whether to enable the operation progress. When true, the operation progress will be returned in `GetOperationStatus`.                                                                                                                                                                                                                                                  [...]
+| kyuubi.operation.query.timeout                  | &lt;undefined&gt;                                                               | Timeout for query executions at server-side, take effect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take full control of whether the query should timeout or not. If set, client-side timeout is capped at this point. To  [...]
+| kyuubi.operation.result.arrow.timestampAsString | false                                                                           | When true, arrow-based rowsets will convert columns of type timestamp to strings for transmission.                                                                                                                                                                                                                                                                     [...]
+| kyuubi.operation.result.format                  | thrift                                                                          | Specify the result format, available configs are: <ul> <li>THRIFT: the result will convert to TRow at the engine driver side. </li> <li>ARROW: the result will be encoded as Arrow at the executor side before collecting by the driver, and deserialized at the client side. note that it only takes effect for kyuubi-hive-jdbc clients now.</li></ul>               [...]
+| kyuubi.operation.result.max.rows                | 0                                                                               | Max rows of Spark query results. Rows exceeding the limit would be ignored. By setting this value to 0 to disable the max rows limit.                                                                                                                                                                                                                                  [...]
+| kyuubi.operation.scheduler.pool                 | &lt;undefined&gt;                                                               | The scheduler pool of job. Note that, this config should be used after changing Spark config spark.scheduler.mode=FAIR.                                                                                                                                                                                                                                                [...]
+| kyuubi.operation.spark.listener.enabled         | true                                                                            | When set to true, Spark engine registers an SQLOperationListener before executing the statement, logging a few summary statistics when each stage completes.                                                                                                                                                                                                           [...]
+| kyuubi.operation.status.polling.timeout         | PT5S                                                                            | Timeout(ms) for long polling asynchronous running sql query's status                                                                                                                                                                                                                                                                                                   [...]
 
 ### Server
 
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index fac90f7ea..2b90525c1 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -162,17 +162,12 @@ class ExecuteStatement(
     }
   }
 
-  // TODO:(fchen) make this configurable
-  val kyuubiBeelineConvertToString = true
-
   def convertComplexType(df: DataFrame): DataFrame = {
-    if (kyuubiBeelineConvertToString) {
-      SparkDatasetHelper.convertTopLevelComplexTypeToHiveString(df)
-    } else {
-      df
-    }
+    SparkDatasetHelper.convertTopLevelComplexTypeToHiveString(df, timestampAsString)
   }
 
   override def getResultSetMetadataHints(): Seq[String] =
-    Seq(s"__kyuubi_operation_result_format__=$resultFormat")
+    Seq(
+      s"__kyuubi_operation_result_format__=$resultFormat",
+      s"__kyuubi_operation_result_arrow_timestampAsString__=$timestampAsString")
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 06884534d..a6a7fc896 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -24,7 +24,7 @@ import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TProgressU
 import org.apache.spark.kyuubi.{SparkProgressMonitor, SQLOperationListener}
 import org.apache.spark.kyuubi.SparkUtilsHelper.redact
 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.types.StructType
 
 import org.apache.kyuubi.{KyuubiSQLException, Utils}
@@ -136,7 +136,7 @@ abstract class SparkOperation(session: Session)
     spark.sparkContext.setLocalProperty
 
   protected def withLocalProperties[T](f: => T): T = {
-    SQLConf.withExistingConf(spark.sessionState.conf) {
+    SQLExecution.withSQLConfPropagated(spark) {
       val originalSession = SparkSession.getActiveSession
       try {
         SparkSession.setActiveSession(spark)
@@ -279,6 +279,10 @@ abstract class SparkOperation(session: Session)
     spark.conf.get("kyuubi.operation.result.format", "thrift")
   }
 
+  protected def timestampAsString: Boolean = {
+    spark.conf.get("kyuubi.operation.result.arrow.timestampAsString", "false").toBoolean
+  }
+
   protected def setSessionUserSign(): Unit = {
     (
       session.conf.get(KYUUBI_SESSION_SIGN_PUBLICKEY),
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index 46c3bce4d..1a5429373 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.kyuubi
 
-import java.time.ZoneId
-
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Dataset, Row}
 import org.apache.spark.sql.functions._
@@ -31,12 +29,13 @@ object SparkDatasetHelper {
     ds.toArrowBatchRdd
   }
 
-  def convertTopLevelComplexTypeToHiveString(df: DataFrame): DataFrame = {
-    val timeZone = ZoneId.of(df.sparkSession.sessionState.conf.sessionLocalTimeZone)
+  def convertTopLevelComplexTypeToHiveString(
+      df: DataFrame,
+      timestampAsString: Boolean): DataFrame = {
 
     val quotedCol = (name: String) => col(quoteIfNeeded(name))
 
-    // an udf to call `RowSet.toHiveString` on complex types(struct/array/map).
+    // an udf to call `RowSet.toHiveString` on complex types(struct/array/map) and timestamp type.
     val toHiveStringUDF = udf[String, Row, String]((row, schemaDDL) => {
       val dt = DataType.fromDDL(schemaDDL)
       dt match {
@@ -46,6 +45,8 @@ object SparkDatasetHelper {
           RowSet.toHiveString((row.toSeq.head, at), nested = true)
         case StructType(Array(StructField(_, mt: MapType, _, _))) =>
           RowSet.toHiveString((row.toSeq.head, mt), nested = true)
+        case StructType(Array(StructField(_, tt: TimestampType, _, _))) =>
+          RowSet.toHiveString((row.toSeq.head, tt), nested = true)
         case _ =>
           throw new UnsupportedOperationException
       }
@@ -56,6 +57,8 @@ object SparkDatasetHelper {
         toHiveStringUDF(quotedCol(name), lit(sf.toDDL)).as(name)
       case sf @ StructField(name, _: MapType | _: ArrayType, _, _) =>
         toHiveStringUDF(struct(quotedCol(name)), lit(sf.toDDL)).as(name)
+      case sf @ StructField(name, _: TimestampType, _, _) if timestampAsString =>
+        toHiveStringUDF(struct(quotedCol(name)), lit(sf.toDDL)).as(name)
       case StructField(name, _, _, _) => quotedCol(name)
     }
     df.select(cols: _*)
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
index e46456914..60cc52891 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
@@ -35,6 +35,13 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
 
   override def resultFormat: String = "arrow"
 
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    withJdbcStatement() { statement =>
+      checkResultSetFormat(statement, "arrow")
+    }
+  }
+
   test("detect resultSet format") {
     withJdbcStatement() { statement =>
       checkResultSetFormat(statement, "arrow")
@@ -43,7 +50,42 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
     }
   }
 
-  def checkResultSetFormat(statement: Statement, expectFormat: String): Unit = {
+  test("Spark session timezone format") {
+    withJdbcStatement() { statement =>
+      def check(expect: String): Unit = {
+        val query =
+          """
+            |SELECT
+            |  from_utc_timestamp(
+            |    from_unixtime(
+            |      1670404535000 / 1000, 'yyyy-MM-dd HH:mm:ss'
+            |    ),
+            |    'GMT+08:00'
+            |  )
+            |""".stripMargin
+        val resultSet = statement.executeQuery(query)
+        assert(resultSet.next())
+        assert(resultSet.getString(1) == expect)
+      }
+
+      def setTimeZone(timeZone: String): Unit = {
+        val rs = statement.executeQuery(s"set spark.sql.session.timeZone=$timeZone")
+        assert(rs.next())
+      }
+
+      Seq("true", "false").foreach { timestampAsString =>
+        statement.executeQuery(
+          s"set ${KyuubiConf.ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING.key}=$timestampAsString")
+        checkArrowBasedRowSetTimestampAsString(statement, timestampAsString)
+        setTimeZone("UTC")
+        check("2022-12-07 17:15:35.0")
+        setTimeZone("GMT+8")
+        check("2022-12-08 01:15:35.0")
+      }
+    }
+  }
+
+  private def checkResultSetFormat(statement: Statement, expectFormat: String): Unit = {
     val query =
       s"""
          |SELECT '$${hivevar:${KyuubiConf.OPERATION_RESULT_FORMAT.key}}' AS col
@@ -52,4 +94,16 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
     assert(resultSet.next())
     assert(resultSet.getString("col") === expectFormat)
   }
+
+  private def checkArrowBasedRowSetTimestampAsString(
+      statement: Statement,
+      expect: String): Unit = {
+    val query =
+      s"""
+         |SELECT '$${hivevar:${KyuubiConf.ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING.key}}' AS col
+         |""".stripMargin
+    val resultSet = statement.executeQuery(query)
+    assert(resultSet.next())
+    assert(resultSet.getString("col") === expect)
+  }
 }
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 14a05e749..05b6a056f 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1676,6 +1676,14 @@ object KyuubiConf {
       .transform(_.toLowerCase(Locale.ROOT))
       .createWithDefault("thrift")
 
+  val ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING: ConfigEntry[Boolean] =
+    buildConf("kyuubi.operation.result.arrow.timestampAsString")
+      .doc("When true, arrow-based rowsets will convert columns of type timestamp to strings for" +
+        " transmission.")
+      .version("1.7.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val SERVER_OPERATION_LOG_DIR_ROOT: ConfigEntry[String] =
     buildConf("kyuubi.operation.log.dir.root")
       .doc("Root directory for query operation log at server-side.")
diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumnAttributes.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumnAttributes.java
index 06fb39899..b0257cfff 100644
--- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumnAttributes.java
+++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumnAttributes.java
@@ -20,7 +20,7 @@ package org.apache.kyuubi.jdbc.hive;
 public class JdbcColumnAttributes {
   public int precision = 0;
   public int scale = 0;
-  public String timeZone = "";
+  public String timeZone = null;
 
   public JdbcColumnAttributes() {}
 
diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowBasedResultSet.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowBasedResultSet.java
index c3e75c0ea..ef5008503 100644
--- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowBasedResultSet.java
+++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowBasedResultSet.java
@@ -50,6 +50,7 @@ public abstract class KyuubiArrowBasedResultSet implements SQLResultSet {
   protected Schema arrowSchema;
   protected VectorSchemaRoot root;
   protected ArrowColumnarBatchRow row;
+  protected boolean timestampAsString = true;
 
   protected BufferAllocator allocator;
 
@@ -312,11 +313,18 @@ public abstract class KyuubiArrowBasedResultSet implements SQLResultSet {
       if (wasNull) {
         return null;
       } else {
-        return row.get(columnIndex - 1, columnType);
+        JdbcColumnAttributes attributes = columnAttributes.get(columnIndex - 1);
+        return row.get(
+            columnIndex - 1,
+            columnType,
+            attributes == null ? null : attributes.timeZone,
+            timestampAsString);
       }
     } catch (Exception e) {
-      e.printStackTrace();
-      throw new KyuubiSQLException("Unrecognized column type:", e);
+      throw new KyuubiSQLException(
+          String.format(
+              "Error getting row of type %s at column index %d", columnType, columnIndex - 1),
+          e);
     }
   }
 
diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowQueryResultSet.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowQueryResultSet.java
index 1f2af29dc..fda70f463 100644
--- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowQueryResultSet.java
+++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiArrowQueryResultSet.java
@@ -58,9 +58,6 @@ public class KyuubiArrowQueryResultSet extends KyuubiArrowBasedResultSet {
   private boolean isScrollable = false;
   private boolean fetchFirst = false;
 
-  // TODO:(fchen) make this configurable
-  protected boolean convertComplexTypeToString = true;
-
   private final TProtocolVersion protocol;
 
   public static class Builder {
@@ -87,6 +84,8 @@ public class KyuubiArrowQueryResultSet extends KyuubiArrowBasedResultSet {
     private boolean isScrollable = false;
     private ReentrantLock transportLock = null;
 
+    private boolean timestampAsString = true;
+
     public Builder(Statement statement) throws SQLException {
       this.statement = statement;
       this.connection = statement.getConnection();
@@ -153,6 +152,11 @@ public class KyuubiArrowQueryResultSet extends KyuubiArrowBasedResultSet {
       return this;
     }
 
+    public Builder setTimestampAsString(boolean timestampAsString) {
+      this.timestampAsString = timestampAsString;
+      return this;
+    }
+
     public Builder setTransportLock(ReentrantLock transportLock) {
       this.transportLock = transportLock;
       return this;
@@ -189,10 +193,10 @@ public class KyuubiArrowQueryResultSet extends KyuubiArrowBasedResultSet {
       this.maxRows = builder.maxRows;
     }
     this.isScrollable = builder.isScrollable;
+    this.timestampAsString = builder.timestampAsString;
     this.protocol = builder.getProtocolVersion();
     arrowSchema =
-        ArrowUtils.toArrowSchema(
-            columnNames, convertComplexTypeToStringType(columnTypes), columnAttributes);
+        ArrowUtils.toArrowSchema(columnNames, convertToStringType(columnTypes), columnAttributes);
     if (allocator == null) {
       initArrowSchemaAndAllocator();
     }
@@ -271,8 +275,7 @@ public class KyuubiArrowQueryResultSet extends KyuubiArrowBasedResultSet {
         columnAttributes.add(getColumnAttributes(primitiveTypeEntry));
       }
       arrowSchema =
-          ArrowUtils.toArrowSchema(
-              columnNames, convertComplexTypeToStringType(columnTypes), columnAttributes);
+          ArrowUtils.toArrowSchema(columnNames, convertToStringType(columnTypes), columnAttributes);
     } catch (SQLException eS) {
       throw eS; // rethrow the SQLException as is
     } catch (Exception ex) {
@@ -480,22 +483,25 @@ public class KyuubiArrowQueryResultSet extends KyuubiArrowBasedResultSet {
     return isClosed;
   }
 
-  private List<TTypeId> convertComplexTypeToStringType(List<TTypeId> colTypes) {
-    if (convertComplexTypeToString) {
-      return colTypes.stream()
-          .map(
-              type -> {
-                if (type == TTypeId.ARRAY_TYPE
-                    || type == TTypeId.MAP_TYPE
-                    || type == TTypeId.STRUCT_TYPE) {
-                  return TTypeId.STRING_TYPE;
-                } else {
-                  return type;
-                }
-              })
-          .collect(Collectors.toList());
-    } else {
-      return colTypes;
-    }
+  /**
+   * 1. the complex types (map/array/struct) are always converted to string type to transport 2. if
+   * the user set `timestampAsString = true`, then the timestamp type will be converted to string
+   * type too.
+   */
+  private List<TTypeId> convertToStringType(List<TTypeId> colTypes) {
+    return colTypes.stream()
+        .map(
+            type -> {
+              if ((type == TTypeId.ARRAY_TYPE
+                      || type == TTypeId.MAP_TYPE
+                      || type == TTypeId.STRUCT_TYPE) // complex type (map/array/struct)
+                  // timestamp type
+                  || (type == TTypeId.TIMESTAMP_TYPE && timestampAsString)) {
+                return TTypeId.STRING_TYPE;
+              } else {
+                return type;
+              }
+            })
+        .collect(Collectors.toList());
   }
 }
diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java
index ab7c06a55..b452ca6aa 100644
--- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java
+++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java
@@ -37,6 +37,7 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
   public static final Logger LOG = LoggerFactory.getLogger(KyuubiStatement.class.getName());
   public static final int DEFAULT_FETCH_SIZE = 1000;
   public static final String DEFAULT_RESULT_FORMAT = "thrift";
+  public static final String DEFAULT_ARROW_TIMESTAMP_AS_STRING = "false";
   private final KyuubiConnection connection;
   private TCLIService.Iface client;
   private TOperationHandle stmtHandle = null;
@@ -45,7 +46,8 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
   private int fetchSize = DEFAULT_FETCH_SIZE;
   private boolean isScrollableResultset = false;
   private boolean isOperationComplete = false;
-  private Map<String, String> properties = new HashMap<>();
+
+  private Map<String, String> properties = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
   /**
    * We need to keep a reference to the result set to support the following: <code>
    * statement.execute(String sql);
@@ -213,6 +215,11 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
     LOG.info("kyuubi.operation.result.format: " + resultFormat);
     switch (resultFormat) {
       case "arrow":
+        boolean timestampAsString =
+            Boolean.parseBoolean(
+                properties.getOrDefault(
+                    "__kyuubi_operation_result_arrow_timestampAsString__",
+                    DEFAULT_ARROW_TIMESTAMP_AS_STRING));
         resultSet =
             new KyuubiArrowQueryResultSet.Builder(this)
                 .setClient(client)
@@ -222,6 +229,7 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
                 .setFetchSize(fetchSize)
                 .setScrollable(isScrollableResultset)
                 .setSchema(columnNames, columnTypes, columnAttributes)
+                .setTimestampAsString(timestampAsString)
                 .build();
         break;
       default:
@@ -270,6 +278,11 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
     LOG.info("kyuubi.operation.result.format: " + resultFormat);
     switch (resultFormat) {
       case "arrow":
+        boolean timestampAsString =
+            Boolean.parseBoolean(
+                properties.getOrDefault(
+                    "__kyuubi_operation_result_arrow_timestampAsString__",
+                    DEFAULT_ARROW_TIMESTAMP_AS_STRING));
         resultSet =
             new KyuubiArrowQueryResultSet.Builder(this)
                 .setClient(client)
@@ -279,6 +292,7 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
                 .setFetchSize(fetchSize)
                 .setScrollable(isScrollableResultset)
                 .setSchema(columnNames, columnTypes, columnAttributes)
+                .setTimestampAsString(timestampAsString)
                 .build();
         break;
       default:
diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java
index fa914ce5d..373867069 100644
--- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java
+++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/arrow/ArrowColumnarBatchRow.java
@@ -19,6 +19,8 @@ package org.apache.kyuubi.jdbc.hive.arrow;
 
 import java.math.BigDecimal;
 import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import org.apache.arrow.vector.util.DateUtility;
 import org.apache.hive.service.rpc.thrift.TTypeId;
 import org.apache.kyuubi.jdbc.hive.common.DateUtils;
 import org.apache.kyuubi.jdbc.hive.common.HiveIntervalDayTime;
@@ -104,7 +106,7 @@ public class ArrowColumnarBatchRow {
     throw new UnsupportedOperationException();
   }
 
-  public Object get(int ordinal, TTypeId dataType) {
+  public Object get(int ordinal, TTypeId dataType, String timeZone, boolean timestampAsString) {
     long seconds;
     long milliseconds;
     long microseconds;
@@ -131,17 +133,19 @@ public class ArrowColumnarBatchRow {
       case STRING_TYPE:
         return getString(ordinal);
       case TIMESTAMP_TYPE:
-        microseconds = getLong(ordinal);
-        nanos = (int) (microseconds % 1000000) * 1000;
-        Timestamp timestamp = new Timestamp(microseconds / 1000);
-        timestamp.setNanos(nanos);
-        return timestamp;
+        if (timestampAsString) {
+          return Timestamp.valueOf(getString(ordinal));
+        } else {
+          LocalDateTime localDateTime =
+              DateUtility.getLocalDateTimeFromEpochMicro(getLong(ordinal), timeZone);
+          return Timestamp.valueOf(localDateTime);
+        }
       case DATE_TYPE:
         return DateUtils.internalToDate(getInt(ordinal));
       case INTERVAL_DAY_TIME_TYPE:
         microseconds = getLong(ordinal);
-        seconds = microseconds / 1000000;
-        nanos = (int) (microseconds % 1000000) * 1000;
+        seconds = microseconds / 1_000_000;
+        nanos = (int) (microseconds % 1_000_000) * 1_000;
         return new HiveIntervalDayTime(seconds, nanos);
       case INTERVAL_YEAR_MONTH_TYPE:
         return new HiveIntervalYearMonth(getInt(ordinal));