You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/05/30 06:17:49 UTC

[spark] branch branch-3.0 updated: [SPARK-31859][SPARK-31861][SPARK-31863] Fix Thriftserver session timezone issues

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0507e6e  [SPARK-31859][SPARK-31861][SPARK-31863] Fix Thriftserver session timezone issues
0507e6e is described below

commit 0507e6e8d460b083876bd874115837651c19420e
Author: Juliusz Sompolski <ju...@databricks.com>
AuthorDate: Sat May 30 06:14:32 2020 +0000

    [SPARK-31859][SPARK-31861][SPARK-31863] Fix Thriftserver session timezone issues
    
    ### What changes were proposed in this pull request?
    
    Timestamp literals in Spark are interpreted as timestamps in local timezone spark.sql.session.timeZone.
    
    If JDBC client is e.g. in TimeZone UTC-7, and sets spark.sql.session.timeZone to PST, and sends a query "SELECT timestamp '2020-05-20 12:00:00'", and the JVM timezone of the Spark cluster is e.g. UTC+2, then what currently happens is:
    * The timestamp literal in the query is interpreted as 12:00:00 UTC-7, i.e. 19:00:00 UTC.
    * When it's returned from the query, it is collected as a java.sql.Timestamp object with Dataset.collect(), and put into a Thriftserver RowSet.
    * Before sending it over the wire, the Timestamp is converted to String. This happens in explicitly in ColumnValue for RowBasedSet, and implicitly in ColumnBuffer for ColumnBasedSet (all non-primitive types are converted toString() there). The conversion toString uses JVM timezone, which results in a "21:00:00" (UTC+2) string representation.
    * The client JDBC application parses gets a "21:00:00" Timestamp back (in it's JVM timezone; if the JDBC application cares about the correct UTC internal value, it should set spark.sql.session.timeZone to be consistent with its JVM timezone)
    
    The problem is caused by the conversion happening in Thriftserver RowSet with the generic toString() function, instead of using HiveResults.toHiveString() that takes care of correct, timezone respecting conversions. This PR fixes it by converting the Timestamp values to String earlier, in SparkExecuteStatementOperation, using that function. This fixes SPARK-31861.
    
    Thriftserver also did not work spark.sql.datetime.java8API.enabled, because the conversions in RowSet expected an Timestamp object instead of Instant object. Using HiveResults.toHiveString() also fixes that. For this reason, we also convert Date values in SparkExecuteStatementOperation as well - so that HiveResults.toHiveString() handles LocalDate as well. This fixes SPARK-31859.
    
    Thriftserver also did not correctly set the active SparkSession. Because of that, configuration obtained using SQLConf.get was not the correct session configuration. This affected getting the correct spark.sql.session.timeZone. It is fixed by extending the use of SparkExecuteStatementOperation.withSchedulerPool to also set the correct active SparkSession. When the correct session is set, we also no longer need to maintain the pool mapping in a sessionToActivePool map. The scheduler po [...]
    
    I used the opportunity to move some repetitive code from the operations to the mixin helper trait.
    
    Closes #28671 from juliuszsompolski/SPARK-31861.
    
    Authored-by: Juliusz Sompolski <ju...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit af35691de449aca2e8db8d6dd7092255a919a04b)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../SparkExecuteStatementOperation.scala           | 59 ++++---------
 .../thriftserver/SparkGetCatalogsOperation.scala   | 14 +---
 .../thriftserver/SparkGetColumnsOperation.scala    | 13 +--
 .../thriftserver/SparkGetFunctionsOperation.scala  | 14 +---
 .../thriftserver/SparkGetSchemasOperation.scala    | 14 +---
 .../thriftserver/SparkGetTableTypesOperation.scala | 13 +--
 .../thriftserver/SparkGetTablesOperation.scala     | 13 +--
 .../thriftserver/SparkGetTypeInfoOperation.scala   | 13 +--
 .../thriftserver/SparkMetadataOperationUtils.scala | 34 --------
 .../sql/hive/thriftserver/SparkOperation.scala     | 96 ++++++++++++++++++++++
 .../hive/thriftserver/SparkSQLSessionManager.scala |  1 -
 .../server/SparkSQLOperationManager.scala          |  5 +-
 .../thriftserver/HiveThriftServer2Suites.scala     | 55 +++++++++++++
 .../org/apache/hive/service/cli/ColumnValue.java   | 20 +----
 .../hive/service/cli/operation/Operation.java      |  5 +-
 .../org/apache/hive/service/cli/ColumnValue.java   | 20 +----
 .../hive/service/cli/operation/Operation.java      |  5 +-
 17 files changed, 209 insertions(+), 185 deletions(-)

diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index d14d70f..b193c73 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -44,12 +44,13 @@ import org.apache.spark.unsafe.types.CalendarInterval
 import org.apache.spark.util.{Utils => SparkUtils}
 
 private[hive] class SparkExecuteStatementOperation(
+    val sqlContext: SQLContext,
     parentSession: HiveSession,
     statement: String,
     confOverlay: JMap[String, String],
     runInBackground: Boolean = true)
-    (sqlContext: SQLContext, sessionToActivePool: JMap[SessionHandle, String])
   extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)
+  with SparkOperation
   with Logging {
 
   private var result: DataFrame = _
@@ -62,7 +63,6 @@ private[hive] class SparkExecuteStatementOperation(
   private var previousFetchStartOffset: Long = 0
   private var iter: Iterator[SparkRow] = _
   private var dataTypes: Array[DataType] = _
-  private var statementId: String = _
 
   private lazy val resultSchema: TableSchema = {
     if (result == null || result.schema.isEmpty) {
@@ -73,13 +73,6 @@ private[hive] class SparkExecuteStatementOperation(
     }
   }
 
-  override def close(): Unit = {
-    // RDDs will be cleaned automatically upon garbage collection.
-    logInfo(s"Close statement with $statementId")
-    cleanup(OperationState.CLOSED)
-    HiveThriftServer2.eventManager.onOperationClosed(statementId)
-  }
-
   def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int): Unit = {
     dataTypes(ordinal) match {
       case StringType =>
@@ -100,12 +93,15 @@ private[hive] class SparkExecuteStatementOperation(
         to += from.getByte(ordinal)
       case ShortType =>
         to += from.getShort(ordinal)
-      case DateType =>
-        to += from.getAs[Date](ordinal)
-      case TimestampType =>
-        to += from.getAs[Timestamp](ordinal)
       case BinaryType =>
         to += from.getAs[Array[Byte]](ordinal)
+      // SPARK-31859, SPARK-31861: Date and Timestamp need to be turned to String here to:
+      // - respect spark.sql.session.timeZone
+      // - work with spark.sql.datetime.java8API.enabled
+      // These types have always been sent over the wire as string, converted later.
+      case _: DateType | _: TimestampType =>
+        val hiveString = HiveResult.toHiveString((from.get(ordinal), dataTypes(ordinal)))
+        to += hiveString
       case CalendarIntervalType =>
         to += HiveResult.toHiveString((from.getAs[CalendarInterval](ordinal), CalendarIntervalType))
       case _: ArrayType | _: StructType | _: MapType | _: UserDefinedType[_] =>
@@ -114,7 +110,7 @@ private[hive] class SparkExecuteStatementOperation(
     }
   }
 
-  def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withSchedulerPool {
+  def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withLocalProperties {
     log.info(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " +
       s"with ${statementId}")
     validateDefaultFetchOrientation(order)
@@ -193,7 +189,6 @@ private[hive] class SparkExecuteStatementOperation(
 
   override def runInternal(): Unit = {
     setState(OperationState.PENDING)
-    statementId = UUID.randomUUID().toString
     logInfo(s"Submitting query '$statement' with $statementId")
     HiveThriftServer2.eventManager.onStatementStart(
       statementId,
@@ -217,7 +212,9 @@ private[hive] class SparkExecuteStatementOperation(
             override def run(): Unit = {
               registerCurrentOperationLog()
               try {
-                execute()
+                withLocalProperties {
+                  execute()
+                }
               } catch {
                 case e: HiveSQLException =>
                   setOperationException(e)
@@ -259,7 +256,7 @@ private[hive] class SparkExecuteStatementOperation(
     }
   }
 
-  private def execute(): Unit = withSchedulerPool {
+  private def execute(): Unit = {
     try {
       synchronized {
         if (getStatus.getState.isTerminal) {
@@ -282,13 +279,6 @@ private[hive] class SparkExecuteStatementOperation(
       sqlContext.sparkContext.setJobGroup(statementId, statement)
       result = sqlContext.sql(statement)
       logDebug(result.queryExecution.toString())
-      result.queryExecution.logical match {
-        case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) =>
-          sessionToActivePool.put(parentSession.getSessionHandle, value)
-          logInfo(s"Setting ${SparkContext.SPARK_SCHEDULER_POOL}=$value for future statements " +
-            "in this session.")
-        case _ =>
-      }
       HiveThriftServer2.eventManager.onStatementParsed(statementId,
         result.queryExecution.toString())
       iter = {
@@ -346,38 +336,25 @@ private[hive] class SparkExecuteStatementOperation(
     synchronized {
       if (!getStatus.getState.isTerminal) {
         logInfo(s"Cancel query with $statementId")
-        cleanup(OperationState.CANCELED)
+        cleanup()
+        setState(OperationState.CANCELED)
         HiveThriftServer2.eventManager.onStatementCanceled(statementId)
       }
     }
   }
 
-  private def cleanup(state: OperationState): Unit = {
-    setState(state)
+  override protected def cleanup(): Unit = {
     if (runInBackground) {
       val backgroundHandle = getBackgroundHandle()
       if (backgroundHandle != null) {
         backgroundHandle.cancel(true)
       }
     }
+    // RDDs will be cleaned automatically upon garbage collection.
     if (statementId != null) {
       sqlContext.sparkContext.cancelJobGroup(statementId)
     }
   }
-
-  private def withSchedulerPool[T](body: => T): T = {
-    val pool = sessionToActivePool.get(parentSession.getSessionHandle)
-    if (pool != null) {
-      sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, pool)
-    }
-    try {
-      body
-    } finally {
-      if (pool != null) {
-        sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, null)
-      }
-    }
-  }
 }
 
 object SparkExecuteStatementOperation {
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala
index 2945cfd..55070e0 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala
@@ -36,19 +36,13 @@ import org.apache.spark.util.{Utils => SparkUtils}
  * @param parentSession a HiveSession from SessionManager
  */
 private[hive] class SparkGetCatalogsOperation(
-    sqlContext: SQLContext,
+    val sqlContext: SQLContext,
     parentSession: HiveSession)
-  extends GetCatalogsOperation(parentSession) with Logging {
-
-  private var statementId: String = _
-
-  override def close(): Unit = {
-    super.close()
-    HiveThriftServer2.eventManager.onOperationClosed(statementId)
-  }
+  extends GetCatalogsOperation(parentSession)
+  with SparkOperation
+  with Logging {
 
   override def runInternal(): Unit = {
-    statementId = UUID.randomUUID().toString
     val logMsg = "Listing catalogs"
     logInfo(s"$logMsg with $statementId")
     setState(OperationState.RUNNING)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
index ff7cbfe..ca8ad5e 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
@@ -48,26 +48,19 @@ import org.apache.spark.util.{Utils => SparkUtils}
  * @param columnName column name
  */
 private[hive] class SparkGetColumnsOperation(
-    sqlContext: SQLContext,
+    val sqlContext: SQLContext,
     parentSession: HiveSession,
     catalogName: String,
     schemaName: String,
     tableName: String,
     columnName: String)
   extends GetColumnsOperation(parentSession, catalogName, schemaName, tableName, columnName)
-    with Logging {
+  with SparkOperation
+  with Logging {
 
   val catalog: SessionCatalog = sqlContext.sessionState.catalog
 
-  private var statementId: String = _
-
-  override def close(): Unit = {
-    super.close()
-    HiveThriftServer2.eventManager.onOperationClosed(statementId)
-  }
-
   override def runInternal(): Unit = {
-    statementId = UUID.randomUUID().toString
     // Do not change cmdStr. It's used for Hive auditing and authorization.
     val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName, tablePattern : $tableName"
     val logMsg = s"Listing columns '$cmdStr, columnName : $columnName'"
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
index d9c12b6..f5e647b 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
@@ -43,22 +43,16 @@ import org.apache.spark.util.{Utils => SparkUtils}
  * @param functionName function name pattern
  */
 private[hive] class SparkGetFunctionsOperation(
-    sqlContext: SQLContext,
+    val sqlContext: SQLContext,
     parentSession: HiveSession,
     catalogName: String,
     schemaName: String,
     functionName: String)
-  extends GetFunctionsOperation(parentSession, catalogName, schemaName, functionName) with Logging {
-
-  private var statementId: String = _
-
-  override def close(): Unit = {
-    super.close()
-    HiveThriftServer2.eventManager.onOperationClosed(statementId)
-  }
+  extends GetFunctionsOperation(parentSession, catalogName, schemaName, functionName)
+  with SparkOperation
+  with Logging {
 
   override def runInternal(): Unit = {
-    statementId = UUID.randomUUID().toString
     // Do not change cmdStr. It's used for Hive auditing and authorization.
     val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName"
     val logMsg = s"Listing functions '$cmdStr, functionName : $functionName'"
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
index db19880..7422098 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
@@ -40,21 +40,15 @@ import org.apache.spark.util.{Utils => SparkUtils}
  * @param schemaName database name, null or a concrete database name
  */
 private[hive] class SparkGetSchemasOperation(
-    sqlContext: SQLContext,
+    val sqlContext: SQLContext,
     parentSession: HiveSession,
     catalogName: String,
     schemaName: String)
-  extends GetSchemasOperation(parentSession, catalogName, schemaName) with Logging {
-
-  private var statementId: String = _
-
-  override def close(): Unit = {
-    super.close()
-    HiveThriftServer2.eventManager.onOperationClosed(statementId)
-  }
+  extends GetSchemasOperation(parentSession, catalogName, schemaName)
+  with SparkOperation
+  with Logging {
 
   override def runInternal(): Unit = {
-    statementId = UUID.randomUUID().toString
     // Do not change cmdStr. It's used for Hive auditing and authorization.
     val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName"
     val logMsg = s"Listing databases '$cmdStr'"
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala
index b4093e5..1cf9c3a 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala
@@ -37,16 +37,11 @@ import org.apache.spark.util.{Utils => SparkUtils}
  * @param parentSession a HiveSession from SessionManager
  */
 private[hive] class SparkGetTableTypesOperation(
-    sqlContext: SQLContext,
+    val sqlContext: SQLContext,
     parentSession: HiveSession)
-  extends GetTableTypesOperation(parentSession) with SparkMetadataOperationUtils with Logging {
-
-  private var statementId: String = _
-
-  override def close(): Unit = {
-    super.close()
-    HiveThriftServer2.eventManager.onOperationClosed(statementId)
-  }
+  extends GetTableTypesOperation(parentSession)
+  with SparkOperation
+  with Logging {
 
   override def runInternal(): Unit = {
     statementId = UUID.randomUUID().toString
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
index 45c6d98..a1d21e2 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
@@ -46,24 +46,17 @@ import org.apache.spark.util.{Utils => SparkUtils}
  * @param tableTypes list of allowed table types, e.g. "TABLE", "VIEW"
  */
 private[hive] class SparkGetTablesOperation(
-    sqlContext: SQLContext,
+    val sqlContext: SQLContext,
     parentSession: HiveSession,
     catalogName: String,
     schemaName: String,
     tableName: String,
     tableTypes: JList[String])
   extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes)
-    with SparkMetadataOperationUtils with Logging {
-
-  private var statementId: String = _
-
-  override def close(): Unit = {
-    super.close()
-    HiveThriftServer2.eventManager.onOperationClosed(statementId)
-  }
+  with SparkOperation
+  with Logging {
 
   override def runInternal(): Unit = {
-    statementId = UUID.randomUUID().toString
     // Do not change cmdStr. It's used for Hive auditing and authorization.
     val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName"
     val tableTypesStr = if (tableTypes == null) "null" else tableTypes.asScala.mkString(",")
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala
index dd5668a..e38139d 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala
@@ -36,16 +36,11 @@ import org.apache.spark.util.{Utils => SparkUtils}
  * @param parentSession a HiveSession from SessionManager
  */
 private[hive] class SparkGetTypeInfoOperation(
-    sqlContext: SQLContext,
+    val sqlContext: SQLContext,
     parentSession: HiveSession)
-  extends GetTypeInfoOperation(parentSession) with Logging {
-
-  private var statementId: String = _
-
-  override def close(): Unit = {
-    super.close()
-    HiveThriftServer2.eventManager.onOperationClosed(statementId)
-  }
+  extends GetTypeInfoOperation(parentSession)
+  with SparkOperation
+  with Logging {
 
   override def runInternal(): Unit = {
     statementId = UUID.randomUUID().toString
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationUtils.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationUtils.scala
deleted file mode 100644
index f4c4b04..0000000
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationUtils.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.thriftserver
-
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType.{EXTERNAL, MANAGED, VIEW}
-
-/**
- * Utils for metadata operations.
- */
-private[hive] trait SparkMetadataOperationUtils {
-
-  def tableTypeString(tableType: CatalogTableType): String = tableType match {
-    case EXTERNAL | MANAGED => "TABLE"
-    case VIEW => "VIEW"
-    case t =>
-      throw new IllegalArgumentException(s"Unknown table type is found: $t")
-  }
-}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala
new file mode 100644
index 0000000..3da568c
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import org.apache.hive.service.cli.OperationState
+import org.apache.hive.service.cli.operation.Operation
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType.{EXTERNAL, MANAGED, VIEW}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
+
+/**
+ * Utils for Spark operations.
+ */
+private[hive] trait SparkOperation extends Operation with Logging {
+
+  protected def sqlContext: SQLContext
+
+  protected var statementId = getHandle().getHandleIdentifier().getPublicId().toString()
+
+  protected def cleanup(): Unit = Unit // noop by default
+
+  abstract override def run(): Unit = {
+    withLocalProperties {
+      super.run()
+    }
+  }
+
+  abstract override def close(): Unit = {
+    cleanup()
+    super.close()
+    logInfo(s"Close statement with $statementId")
+    HiveThriftServer2.eventManager.onOperationClosed(statementId)
+  }
+
+  // Set thread local properties for the execution of the operation.
+  // This method should be applied during the execution of the operation, by all the child threads.
+  // The original spark context local properties will be restored after the operation.
+  //
+  // It is used to:
+  // - set appropriate SparkSession
+  // - set scheduler pool for the operation
+  def withLocalProperties[T](f: => T): T = {
+    val originalProps = Utils.cloneProperties(sqlContext.sparkContext.getLocalProperties)
+    val originalSession = SparkSession.getActiveSession
+
+    try {
+      // Set active SparkSession
+      SparkSession.setActiveSession(sqlContext.sparkSession)
+
+      // Set scheduler pool
+      sqlContext.sparkSession.conf.getOption(SQLConf.THRIFTSERVER_POOL.key) match {
+        case Some(pool) =>
+          sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, pool)
+        case None =>
+      }
+
+      // run the body
+      f
+    } finally {
+      // reset local properties, will also reset SPARK_SCHEDULER_POOL
+      sqlContext.sparkContext.setLocalProperties(originalProps)
+
+      originalSession match {
+        case Some(session) => SparkSession.setActiveSession(session)
+        case None => SparkSession.clearActiveSession()
+      }
+    }
+  }
+
+  def tableTypeString(tableType: CatalogTableType): String = tableType match {
+    case EXTERNAL | MANAGED => "TABLE"
+    case VIEW => "VIEW"
+    case t =>
+      throw new IllegalArgumentException(s"Unknown table type is found: $t")
+  }
+}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index b317189..e10e7ed 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -78,7 +78,6 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext:
     val ctx = sparkSqlOperationManager.sessionToContexts.getOrDefault(sessionHandle, sqlContext)
     ctx.sparkSession.sessionState.catalog.getTempViewNames().foreach(ctx.uncacheTable)
     super.closeSession(sessionHandle)
-    sparkSqlOperationManager.sessionToActivePool.remove(sessionHandle)
     sparkSqlOperationManager.sessionToContexts.remove(sessionHandle)
   }
 
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index 3396560..bc9c13e 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -38,7 +38,6 @@ private[thriftserver] class SparkSQLOperationManager()
   val handleToOperation = ReflectionUtils
     .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
 
-  val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
   val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()
 
   override def newExecuteStatementOperation(
@@ -51,8 +50,8 @@ private[thriftserver] class SparkSQLOperationManager()
       s" initialized or had already closed.")
     val conf = sqlContext.sessionState.conf
     val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
-    val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
-      runInBackground)(sqlContext, sessionToActivePool)
+    val operation = new SparkExecuteStatementOperation(
+      sqlContext, parentSession, statement, confOverlay, runInBackground)
     handleToOperation.put(operation.getHandle, operation)
     logDebug(s"Created Operation for $statement with session=$parentSession, " +
       s"runInBackground=$runInBackground")
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 0cec634..21256ad 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -811,6 +811,61 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
       }
     }
   }
+
+  test("SPARK-31859 Thriftserver works with spark.sql.datetime.java8API.enabled=true") {
+    withJdbcStatement() { statement =>
+      withJdbcStatement() { st =>
+        st.execute("set spark.sql.datetime.java8API.enabled=true")
+        val rs = st.executeQuery("select date '2020-05-28', timestamp '2020-05-28 00:00:00'")
+        rs.next()
+        assert(rs.getDate(1).toString() == "2020-05-28")
+        assert(rs.getTimestamp(2).toString() == "2020-05-28 00:00:00.0")
+      }
+    }
+  }
+
+  test("SPARK-31861 Thriftserver respects spark.sql.session.timeZone") {
+    withJdbcStatement() { statement =>
+      withJdbcStatement() { st =>
+        st.execute("set spark.sql.session.timeZone=+03:15") // different than Thriftserver's JVM tz
+      val rs = st.executeQuery("select timestamp '2020-05-28 10:00:00'")
+        rs.next()
+        // The timestamp as string is the same as the literal
+        assert(rs.getString(1) == "2020-05-28 10:00:00.0")
+        // Parsing it to java.sql.Timestamp in the client will always result in a timestamp
+        // in client default JVM timezone. The string value of the Timestamp will match the literal,
+        // but if the JDBC application cares about the internal timezone and UTC offset of the
+        // Timestamp object, it should set spark.sql.session.timeZone to match its client JVM tz.
+        assert(rs.getTimestamp(1).toString() == "2020-05-28 10:00:00.0")
+      }
+    }
+  }
+
+  test("SPARK-31863 Session conf should persist between Thriftserver worker threads") {
+    val iter = 20
+    withJdbcStatement() { statement =>
+      // date 'now' is resolved during parsing, and relies on SQLConf.get to
+      // obtain the current set timezone. We exploit this to run this test.
+      // If the timezones are set correctly to 25 hours apart across threads,
+      // the dates should reflect this.
+
+      // iterate a few times for the odd chance the same thread is selected
+      for (_ <- 0 until iter) {
+        statement.execute("SET spark.sql.session.timeZone=GMT-12")
+        val firstResult = statement.executeQuery("SELECT date 'now'")
+        firstResult.next()
+        val beyondDateLineWest = firstResult.getDate(1)
+
+        statement.execute("SET spark.sql.session.timeZone=GMT+13")
+        val secondResult = statement.executeQuery("SELECT date 'now'")
+        secondResult.next()
+        val dateLineEast = secondResult.getDate(1)
+        assert(
+          dateLineEast after beyondDateLineWest,
+          "SQLConf changes should persist across execution threads")
+      }
+    }
+  }
 }
 
 class SingleSessionSuite extends HiveThriftJdbcTest {
diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/ColumnValue.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/ColumnValue.java
index a770bea..462b93a 100644
--- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/ColumnValue.java
+++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/ColumnValue.java
@@ -123,22 +123,6 @@ public class ColumnValue {
     return TColumnValue.stringVal(tStringValue);
   }
 
-  private static TColumnValue dateValue(Date value) {
-    TStringValue tStringValue = new TStringValue();
-    if (value != null) {
-      tStringValue.setValue(value.toString());
-    }
-    return new TColumnValue(TColumnValue.stringVal(tStringValue));
-  }
-
-  private static TColumnValue timestampValue(Timestamp value) {
-    TStringValue tStringValue = new TStringValue();
-    if (value != null) {
-      tStringValue.setValue(value.toString());
-    }
-    return TColumnValue.stringVal(tStringValue);
-  }
-
   private static TColumnValue stringValue(HiveIntervalYearMonth value) {
     TStringValue tStrValue = new TStringValue();
     if (value != null) {
@@ -178,9 +162,9 @@ public class ColumnValue {
     case VARCHAR_TYPE:
       return stringValue((HiveVarchar)value);
     case DATE_TYPE:
-      return dateValue((Date)value);
     case TIMESTAMP_TYPE:
-      return timestampValue((Timestamp)value);
+      // SPARK-31859, SPARK-31861: converted to string already in SparkExecuteStatementOperation
+      return stringValue((String)value);
     case INTERVAL_YEAR_MONTH_TYPE:
       return stringValue((HiveIntervalYearMonth) value);
     case INTERVAL_DAY_TIME_TYPE:
diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/operation/Operation.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/operation/Operation.java
index 51bb287..4b33142 100644
--- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/operation/Operation.java
+++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/operation/Operation.java
@@ -280,7 +280,10 @@ public abstract class Operation {
     throw new UnsupportedOperationException("SQLOperation.cancel()");
   }
 
-  public abstract void close() throws HiveSQLException;
+  public void close() throws HiveSQLException {
+    setState(OperationState.CLOSED);
+    cleanupOperationLog();
+  }
 
   public abstract TableSchema getResultSetSchema() throws HiveSQLException;
 
diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/ColumnValue.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/ColumnValue.java
index 53f0465..85adf55 100644
--- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/ColumnValue.java
+++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/ColumnValue.java
@@ -124,22 +124,6 @@ public class ColumnValue {
     return TColumnValue.stringVal(tStringValue);
   }
 
-  private static TColumnValue dateValue(Date value) {
-    TStringValue tStringValue = new TStringValue();
-    if (value != null) {
-      tStringValue.setValue(value.toString());
-    }
-    return new TColumnValue(TColumnValue.stringVal(tStringValue));
-  }
-
-  private static TColumnValue timestampValue(Timestamp value) {
-    TStringValue tStringValue = new TStringValue();
-    if (value != null) {
-      tStringValue.setValue(value.toString());
-    }
-    return TColumnValue.stringVal(tStringValue);
-  }
-
   private static TColumnValue stringValue(HiveIntervalYearMonth value) {
     TStringValue tStrValue = new TStringValue();
     if (value != null) {
@@ -181,9 +165,9 @@ public class ColumnValue {
     case VARCHAR_TYPE:
       return stringValue((HiveVarchar)value);
     case DATE_TYPE:
-      return dateValue((Date)value);
     case TIMESTAMP_TYPE:
-      return timestampValue((Timestamp)value);
+      // SPARK-31859, SPARK-31861: converted to string already in SparkExecuteStatementOperation
+      return stringValue((String)value);
     case INTERVAL_YEAR_MONTH_TYPE:
       return stringValue((HiveIntervalYearMonth) value);
     case INTERVAL_DAY_TIME_TYPE:
diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/operation/Operation.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/operation/Operation.java
index f26c715..558c68f 100644
--- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/operation/Operation.java
+++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/operation/Operation.java
@@ -298,7 +298,10 @@ public abstract class Operation {
     throw new UnsupportedOperationException("SQLOperation.cancel()");
   }
 
-  public abstract void close() throws HiveSQLException;
+  public void close() throws HiveSQLException {
+    setState(OperationState.CLOSED);
+    cleanupOperationLog();
+  }
 
   public abstract TableSchema getResultSetSchema() throws HiveSQLException;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org