You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/17 18:19:31 UTC

[spark] branch branch-3.0 updated: [SPARK-30808][SQL] Enable Java 8 time API in Thrift server

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new fbccc2a  [SPARK-30808][SQL] Enable Java 8 time API in Thrift server
fbccc2a is described below

commit fbccc2ac757000ab260d0b04539216f8efbcefa0
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Tue Feb 18 02:15:44 2020 +0800

    [SPARK-30808][SQL] Enable Java 8 time API in Thrift server
    
    ### What changes were proposed in this pull request?
    - Set `spark.sql.datetime.java8API.enabled` to `true` in `hiveResultString()`, and restore it back at the end of the call.
    - Convert collected `java.time.Instant` & `java.time.LocalDate` to `java.sql.Timestamp` and `java.sql.Date` for correct formatting.
    
    ### Why are the changes needed?
    Because of textual representation of timestamps/dates before 1582 year is incorrect:
    ```shell
    $ export TZ="America/Los_Angeles"
    $ ./bin/spark-sql -S
    ```
    ```sql
    spark-sql> set spark.sql.session.timeZone=America/Los_Angeles;
    spark.sql.session.timeZone	America/Los_Angeles
    spark-sql> SELECT DATE_TRUNC('MILLENNIUM', DATE '1970-03-20');
    1001-01-01 00:07:02
    ```
    It must be 1001-01-01 00:**00:00**.
    
    ### Does this PR introduce any user-facing change?
    Yes. After the changes:
    ```shell
    $ export TZ="America/Los_Angeles"
    $ ./bin/spark-sql -S
    ```
    ```sql
    spark-sql> set spark.sql.session.timeZone=America/Los_Angeles;
    spark.sql.session.timeZone	America/Los_Angeles
    spark-sql> SELECT DATE_TRUNC('MILLENNIUM', DATE '1970-03-20');
    1001-01-01 00:00:00
    ```
    
    ### How was this patch tested?
    By running hive-thiftserver tests. In particular:
    ```
    ./build/sbt -Phadoop-2.7 -Phive-2.3 -Phive-thriftserver "hive-thriftserver/test:testOnly *SparkThriftServerProtocolVersionsSuite"
    ```
    
    Closes #27552 from MaxGekk/hive-thriftserver-java8-time-api.
    
    Authored-by: Maxim Gekk <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit afaeb29599593f021c9ea47e52f8c70013a4afef)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/sql/execution/HiveResult.scala    | 61 ++++++++++++++--------
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala |  3 +-
 .../org/apache/spark/sql/SQLQueryTestSuite.scala   |  3 +-
 .../spark/sql/execution/HiveResultSuite.scala      | 25 ++++-----
 .../SparkExecuteStatementOperation.scala           | 10 +++-
 .../sql/hive/thriftserver/SparkSQLDriver.scala     |  5 +-
 .../sql/hive/execution/HiveComparisonTest.scala    |  4 +-
 7 files changed, 67 insertions(+), 44 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
index 5a2f16d..b191840 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
@@ -21,9 +21,10 @@ import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
 import java.time.{Instant, LocalDate}
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
 import org.apache.spark.sql.execution.command.{DescribeCommandBase, ExecutedCommandExec, ShowTablesCommand}
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.CalendarInterval
@@ -36,27 +37,43 @@ object HiveResult {
    * Returns the result as a hive compatible sequence of strings. This is used in tests and
    * `SparkSQLDriver` for CLI applications.
    */
-  def hiveResultString(executedPlan: SparkPlan): Seq[String] = executedPlan match {
-    case ExecutedCommandExec(_: DescribeCommandBase) =>
-      // If it is a describe command for a Hive table, we want to have the output format
-      // be similar with Hive.
-      executedPlan.executeCollectPublic().map {
-        case Row(name: String, dataType: String, comment) =>
-          Seq(name, dataType,
-            Option(comment.asInstanceOf[String]).getOrElse(""))
-            .map(s => String.format(s"%-20s", s))
-            .mkString("\t")
-      }
-    // SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp.
-    case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
-      command.executeCollect().map(_.getString(1))
-    case other =>
-      val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
-      // We need the types so we can output struct field names
-      val types = executedPlan.output.map(_.dataType)
-      // Reformat to match hive tab delimited output.
-      result.map(_.zip(types).map(e => toHiveString(e)))
-        .map(_.mkString("\t"))
+  def hiveResultString(ds: Dataset[_]): Seq[String] = {
+    val executedPlan = ds.queryExecution.executedPlan
+    executedPlan match {
+      case ExecutedCommandExec(_: DescribeCommandBase) =>
+        // If it is a describe command for a Hive table, we want to have the output format
+        // be similar with Hive.
+        executedPlan.executeCollectPublic().map {
+          case Row(name: String, dataType: String, comment) =>
+            Seq(name, dataType,
+              Option(comment.asInstanceOf[String]).getOrElse(""))
+              .map(s => String.format(s"%-20s", s))
+              .mkString("\t")
+        }
+      // SHOW TABLES in Hive only output table names,
+      // while ours output database, table name, isTemp.
+      case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
+        command.executeCollect().map(_.getString(1))
+      case _ =>
+        val sessionWithJava8DatetimeEnabled = {
+          val cloned = ds.sparkSession.cloneSession()
+          cloned.conf.set(SQLConf.DATETIME_JAVA8API_ENABLED.key, true)
+          cloned
+        }
+        sessionWithJava8DatetimeEnabled.withActive {
+          // We cannot collect the original dataset because its encoders could be created
+          // with disabled Java 8 date-time API.
+          val result: Seq[Seq[Any]] = Dataset.ofRows(ds.sparkSession, ds.logicalPlan)
+            .queryExecution
+            .executedPlan
+            .executeCollectPublic().map(_.toSeq).toSeq
+          // We need the types so we can output struct field names
+          val types = executedPlan.output.map(_.dataType)
+          // Reformat to match hive tab delimited output.
+          result.map(_.zip(types).map(e => toHiveString(e)))
+            .map(_.mkString("\t"))
+        }
+    }
   }
 
   private lazy val zoneId = DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 11f9724..d6efb2f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -189,8 +189,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
           example.split("  > ").toList.foreach(_ match {
             case exampleRe(sql, output) =>
               val df = clonedSpark.sql(sql)
-              val actual = unindentAndTrim(
-                hiveResultString(df.queryExecution.executedPlan).mkString("\n"))
+              val actual = unindentAndTrim(hiveResultString(df).mkString("\n"))
               val expected = unindentAndTrim(output)
               assert(actual === expected)
             case _ =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index 8328591..34829f1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -378,7 +378,6 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession {
         localSparkSession.conf.set(SQLConf.ANSI_ENABLED.key, true)
       case _ =>
     }
-    localSparkSession.conf.set(SQLConf.DATETIME_JAVA8API_ENABLED.key, true)
 
     if (configSet.nonEmpty) {
       // Execute the list of set operation in order to add the desired configs
@@ -512,7 +511,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession {
     val schema = df.schema.catalogString
     // Get answer, but also get rid of the #1234 expression ids that show up in explain plans
     val answer = SQLExecution.withNewExecutionId(df.queryExecution, Some(sql)) {
-      hiveResultString(df.queryExecution.executedPlan).map(replaceNotIncludedMsg)
+      hiveResultString(df).map(replaceNotIncludedMsg)
     }
 
     // If the output is not pre-sorted, sort it.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
index bb59b12..bddd15c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
@@ -25,11 +25,10 @@ class HiveResultSuite extends SharedSparkSession {
   test("date formatting in hive result") {
     val dates = Seq("2018-12-28", "1582-10-13", "1582-10-14", "1582-10-15")
     val df = dates.toDF("a").selectExpr("cast(a as date) as b")
-    val executedPlan1 = df.queryExecution.executedPlan
-    val result = HiveResult.hiveResultString(executedPlan1)
+    val result = HiveResult.hiveResultString(df)
     assert(result == dates)
-    val executedPlan2 = df.selectExpr("array(b)").queryExecution.executedPlan
-    val result2 = HiveResult.hiveResultString(executedPlan2)
+    val df2 = df.selectExpr("array(b)")
+    val result2 = HiveResult.hiveResultString(df2)
     assert(result2 == dates.map(x => s"[$x]"))
   }
 
@@ -40,11 +39,10 @@ class HiveResultSuite extends SharedSparkSession {
       "1582-10-14 01:02:03",
       "1582-10-15 01:02:03")
     val df = timestamps.toDF("a").selectExpr("cast(a as timestamp) as b")
-    val executedPlan1 = df.queryExecution.executedPlan
-    val result = HiveResult.hiveResultString(executedPlan1)
+    val result = HiveResult.hiveResultString(df)
     assert(result == timestamps)
-    val executedPlan2 = df.selectExpr("array(b)").queryExecution.executedPlan
-    val result2 = HiveResult.hiveResultString(executedPlan2)
+    val df2 = df.selectExpr("array(b)")
+    val result2 = HiveResult.hiveResultString(df2)
     assert(result2 == timestamps.map(x => s"[$x]"))
   }
 
@@ -57,15 +55,14 @@ class HiveResultSuite extends SharedSparkSession {
   test("decimal formatting in hive result") {
     val df = Seq(new java.math.BigDecimal("1")).toDS()
     Seq(2, 6, 18).foreach { scala =>
-      val executedPlan =
-        df.selectExpr(s"CAST(value AS decimal(38, $scala))").queryExecution.executedPlan
-      val result = HiveResult.hiveResultString(executedPlan)
+      val decimalDf = df.selectExpr(s"CAST(value AS decimal(38, $scala))")
+      val result = HiveResult.hiveResultString(decimalDf)
       assert(result.head.split("\\.").last.length === scala)
     }
 
-    val executedPlan = Seq(java.math.BigDecimal.ZERO).toDS()
-      .selectExpr(s"CAST(value AS decimal(38, 8))").queryExecution.executedPlan
-    val result = HiveResult.hiveResultString(executedPlan)
+    val df2 = Seq(java.math.BigDecimal.ZERO).toDS()
+      .selectExpr(s"CAST(value AS decimal(38, 8))")
+    val result = HiveResult.hiveResultString(df2)
     assert(result.head === "0.00000000")
   }
 }
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index cf0e5eb..7bcd803 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
 
 import java.security.PrivilegedExceptionAction
 import java.sql.{Date, Timestamp}
+import java.time.{Instant, LocalDate}
 import java.util.{Arrays, Map => JMap, UUID}
 import java.util.concurrent.RejectedExecutionException
 
@@ -178,7 +179,14 @@ private[hive] class SparkExecuteStatementOperation(
           }
           curCol += 1
         }
-        resultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
+        // Convert date-time instances to types that are acceptable by Hive libs
+        // used in conversions to strings.
+        val resultRow = row.map {
+          case i: Instant => Timestamp.from(i)
+          case ld: LocalDate => Date.valueOf(ld)
+          case other => other
+        }.toArray.asInstanceOf[Array[Object]]
+        resultRowSet.addRow(resultRow)
         curRow += 1
         resultOffset += 1
       }
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
index 12fba0e..64e91f4 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
@@ -60,9 +60,10 @@ private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlCont
     // TODO unify the error code
     try {
       context.sparkContext.setJobDescription(command)
-      val execution = context.sessionState.executePlan(context.sql(command).logicalPlan)
+      val df = context.sql(command)
+      val execution = df.queryExecution
       hiveResponse = SQLExecution.withNewExecutionId(execution) {
-        hiveResultString(execution.executedPlan)
+        hiveResultString(df)
       }
       tableSchema = getResultSetSchema(execution)
       new CommandProcessorResponse(0)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 8b1f4c9..82fe274 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -346,7 +346,9 @@ abstract class HiveComparisonTest
         val catalystResults = queryList.zip(hiveResults).map { case (queryString, hive) =>
           val query = new TestHiveQueryExecution(queryString.replace("../../data", testDataPath))
           def getResult(): Seq[String] = {
-            SQLExecution.withNewExecutionId(query)(hiveResultString(query.executedPlan))
+            SQLExecution.withNewExecutionId(query) {
+              hiveResultString(Dataset.ofRows(query.sparkSession, query.logical))
+            }
           }
           try { (query, prepareAnswer(query, getResult())) } catch {
             case e: Throwable =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org