You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/03/03 06:55:57 UTC
[spark] branch branch-3.0 updated: Revert "[SPARK-30808][SQL]
Enable Java 8 time API in Thrift server"
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 4fa447c Revert "[SPARK-30808][SQL] Enable Java 8 time API in Thrift server"
4fa447c is described below
commit 4fa447cabc1c09170edd9da0e586660a7ae0db74
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Tue Mar 3 14:21:20 2020 +0800
Revert "[SPARK-30808][SQL] Enable Java 8 time API in Thrift server"
This reverts commit afaeb29599593f021c9ea47e52f8c70013a4afef.
### What changes were proposed in this pull request?
Based on the result and comment from https://github.com/apache/spark/pull/27552#discussion_r385531744
In the hive module, server-side provides datetime values simply use `value.toSting`, and the client-side regenerates the results back in `HiveBaseResultSet` with `java.sql.Date(Timestamp).valueOf`.
there will be inconsistency between client and server if we use java8 APIs
### Why are the changes needed?
the change is still unclear enough
### Does this PR introduce any user-facing change?
no
### How was this patch tested?
Nah
Closes #27733 from yaooqinn/SPARK-30808.
Authored-by: Kent Yao <ya...@hotmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 1fac06c4307c8c7a5a48a50952d48ee5b9ebccb2)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../apache/spark/sql/execution/HiveResult.scala | 61 ++++++++--------------
.../scala/org/apache/spark/sql/SQLQuerySuite.scala | 3 +-
.../org/apache/spark/sql/SQLQueryTestSuite.scala | 2 +-
.../spark/sql/execution/HiveResultSuite.scala | 25 +++++----
.../SparkExecuteStatementOperation.scala | 10 +---
.../sql/hive/thriftserver/SparkSQLDriver.scala | 5 +-
.../sql/hive/execution/HiveComparisonTest.scala | 4 +-
7 files changed, 43 insertions(+), 67 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
index b191840..5a2f16d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala
@@ -21,10 +21,9 @@ import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate}
-import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.execution.command.{DescribeCommandBase, ExecutedCommandExec, ShowTablesCommand}
-import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
@@ -37,43 +36,27 @@ object HiveResult {
* Returns the result as a hive compatible sequence of strings. This is used in tests and
* `SparkSQLDriver` for CLI applications.
*/
- def hiveResultString(ds: Dataset[_]): Seq[String] = {
- val executedPlan = ds.queryExecution.executedPlan
- executedPlan match {
- case ExecutedCommandExec(_: DescribeCommandBase) =>
- // If it is a describe command for a Hive table, we want to have the output format
- // be similar with Hive.
- executedPlan.executeCollectPublic().map {
- case Row(name: String, dataType: String, comment) =>
- Seq(name, dataType,
- Option(comment.asInstanceOf[String]).getOrElse(""))
- .map(s => String.format(s"%-20s", s))
- .mkString("\t")
- }
- // SHOW TABLES in Hive only output table names,
- // while ours output database, table name, isTemp.
- case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
- command.executeCollect().map(_.getString(1))
- case _ =>
- val sessionWithJava8DatetimeEnabled = {
- val cloned = ds.sparkSession.cloneSession()
- cloned.conf.set(SQLConf.DATETIME_JAVA8API_ENABLED.key, true)
- cloned
- }
- sessionWithJava8DatetimeEnabled.withActive {
- // We cannot collect the original dataset because its encoders could be created
- // with disabled Java 8 date-time API.
- val result: Seq[Seq[Any]] = Dataset.ofRows(ds.sparkSession, ds.logicalPlan)
- .queryExecution
- .executedPlan
- .executeCollectPublic().map(_.toSeq).toSeq
- // We need the types so we can output struct field names
- val types = executedPlan.output.map(_.dataType)
- // Reformat to match hive tab delimited output.
- result.map(_.zip(types).map(e => toHiveString(e)))
- .map(_.mkString("\t"))
- }
- }
+ def hiveResultString(executedPlan: SparkPlan): Seq[String] = executedPlan match {
+ case ExecutedCommandExec(_: DescribeCommandBase) =>
+ // If it is a describe command for a Hive table, we want to have the output format
+ // be similar with Hive.
+ executedPlan.executeCollectPublic().map {
+ case Row(name: String, dataType: String, comment) =>
+ Seq(name, dataType,
+ Option(comment.asInstanceOf[String]).getOrElse(""))
+ .map(s => String.format(s"%-20s", s))
+ .mkString("\t")
+ }
+ // SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp.
+ case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>
+ command.executeCollect().map(_.getString(1))
+ case other =>
+ val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
+ // We need the types so we can output struct field names
+ val types = executedPlan.output.map(_.dataType)
+ // Reformat to match hive tab delimited output.
+ result.map(_.zip(types).map(e => toHiveString(e)))
+ .map(_.mkString("\t"))
}
private lazy val zoneId = DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 87de8f5..563b4d1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -190,7 +190,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
example.split(" > ").toList.foreach(_ match {
case exampleRe(sql, output) =>
val df = clonedSpark.sql(sql)
- val actual = unindentAndTrim(hiveResultString(df).mkString("\n"))
+ val actual = unindentAndTrim(
+ hiveResultString(df.queryExecution.executedPlan).mkString("\n"))
val expected = unindentAndTrim(output)
assert(actual === expected)
case _ =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index 34829f1..6c66166 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -511,7 +511,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession {
val schema = df.schema.catalogString
// Get answer, but also get rid of the #1234 expression ids that show up in explain plans
val answer = SQLExecution.withNewExecutionId(df.queryExecution, Some(sql)) {
- hiveResultString(df).map(replaceNotIncludedMsg)
+ hiveResultString(df.queryExecution.executedPlan).map(replaceNotIncludedMsg)
}
// If the output is not pre-sorted, sort it.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
index bddd15c..bb59b12 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
@@ -25,10 +25,11 @@ class HiveResultSuite extends SharedSparkSession {
test("date formatting in hive result") {
val dates = Seq("2018-12-28", "1582-10-13", "1582-10-14", "1582-10-15")
val df = dates.toDF("a").selectExpr("cast(a as date) as b")
- val result = HiveResult.hiveResultString(df)
+ val executedPlan1 = df.queryExecution.executedPlan
+ val result = HiveResult.hiveResultString(executedPlan1)
assert(result == dates)
- val df2 = df.selectExpr("array(b)")
- val result2 = HiveResult.hiveResultString(df2)
+ val executedPlan2 = df.selectExpr("array(b)").queryExecution.executedPlan
+ val result2 = HiveResult.hiveResultString(executedPlan2)
assert(result2 == dates.map(x => s"[$x]"))
}
@@ -39,10 +40,11 @@ class HiveResultSuite extends SharedSparkSession {
"1582-10-14 01:02:03",
"1582-10-15 01:02:03")
val df = timestamps.toDF("a").selectExpr("cast(a as timestamp) as b")
- val result = HiveResult.hiveResultString(df)
+ val executedPlan1 = df.queryExecution.executedPlan
+ val result = HiveResult.hiveResultString(executedPlan1)
assert(result == timestamps)
- val df2 = df.selectExpr("array(b)")
- val result2 = HiveResult.hiveResultString(df2)
+ val executedPlan2 = df.selectExpr("array(b)").queryExecution.executedPlan
+ val result2 = HiveResult.hiveResultString(executedPlan2)
assert(result2 == timestamps.map(x => s"[$x]"))
}
@@ -55,14 +57,15 @@ class HiveResultSuite extends SharedSparkSession {
test("decimal formatting in hive result") {
val df = Seq(new java.math.BigDecimal("1")).toDS()
Seq(2, 6, 18).foreach { scala =>
- val decimalDf = df.selectExpr(s"CAST(value AS decimal(38, $scala))")
- val result = HiveResult.hiveResultString(decimalDf)
+ val executedPlan =
+ df.selectExpr(s"CAST(value AS decimal(38, $scala))").queryExecution.executedPlan
+ val result = HiveResult.hiveResultString(executedPlan)
assert(result.head.split("\\.").last.length === scala)
}
- val df2 = Seq(java.math.BigDecimal.ZERO).toDS()
- .selectExpr(s"CAST(value AS decimal(38, 8))")
- val result = HiveResult.hiveResultString(df2)
+ val executedPlan = Seq(java.math.BigDecimal.ZERO).toDS()
+ .selectExpr(s"CAST(value AS decimal(38, 8))").queryExecution.executedPlan
+ val result = HiveResult.hiveResultString(executedPlan)
assert(result.head === "0.00000000")
}
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 7bcd803..cf0e5eb 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.thriftserver
import java.security.PrivilegedExceptionAction
import java.sql.{Date, Timestamp}
-import java.time.{Instant, LocalDate}
import java.util.{Arrays, Map => JMap, UUID}
import java.util.concurrent.RejectedExecutionException
@@ -179,14 +178,7 @@ private[hive] class SparkExecuteStatementOperation(
}
curCol += 1
}
- // Convert date-time instances to types that are acceptable by Hive libs
- // used in conversions to strings.
- val resultRow = row.map {
- case i: Instant => Timestamp.from(i)
- case ld: LocalDate => Date.valueOf(ld)
- case other => other
- }.toArray.asInstanceOf[Array[Object]]
- resultRowSet.addRow(resultRow)
+ resultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
curRow += 1
resultOffset += 1
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
index 64e91f4..12fba0e 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
@@ -60,10 +60,9 @@ private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlCont
// TODO unify the error code
try {
context.sparkContext.setJobDescription(command)
- val df = context.sql(command)
- val execution = df.queryExecution
+ val execution = context.sessionState.executePlan(context.sql(command).logicalPlan)
hiveResponse = SQLExecution.withNewExecutionId(execution) {
- hiveResultString(df)
+ hiveResultString(execution.executedPlan)
}
tableSchema = getResultSetSchema(execution)
new CommandProcessorResponse(0)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 82fe274..8b1f4c9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -346,9 +346,7 @@ abstract class HiveComparisonTest
val catalystResults = queryList.zip(hiveResults).map { case (queryString, hive) =>
val query = new TestHiveQueryExecution(queryString.replace("../../data", testDataPath))
def getResult(): Seq[String] = {
- SQLExecution.withNewExecutionId(query) {
- hiveResultString(Dataset.ofRows(query.sparkSession, query.logical))
- }
+ SQLExecution.withNewExecutionId(query)(hiveResultString(query.executedPlan))
}
try { (query, prepareAnswer(query, getResult())) } catch {
case e: Throwable =>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org