You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/02/28 21:48:21 UTC
[spark] branch branch-3.4 updated: [SPARK-42615][CONNECT][FOLLOW-UP] Implement correct version API in SparkSession for Scala client
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new dc5efe1eb07 [SPARK-42615][CONNECT][FOLLOW-UP] Implement correct version API in SparkSession for Scala client
dc5efe1eb07 is described below
commit dc5efe1eb077288488266e1b319cc61c4fb75408
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Tue Feb 28 13:47:48 2023 -0800
[SPARK-42615][CONNECT][FOLLOW-UP] Implement correct version API in SparkSession for Scala client
### What changes were proposed in this pull request?
Following up on https://github.com/apache/spark/pull/40210, add correct `version` in the scala client side.
### Why are the changes needed?
API coverage
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
UT
Closes #40222 from amaliujia/improve_sparksession.
Authored-by: Rui Wang <ru...@databricks.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit 41bc4e6f8068571bc58186f090717bc645914105)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../scala/org/apache/spark/sql/SparkSession.scala | 7 ++++---
.../spark/sql/connect/client/SparkConnectClient.scala | 19 +++++++++++++------
.../org/apache/spark/sql/ClientE2ETestSuite.scala | 4 ++++
3 files changed, 21 insertions(+), 9 deletions(-)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index ee902870c77..a05dc89a54f 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -24,7 +24,6 @@ import scala.collection.JavaConverters._
import org.apache.arrow.memory.RootAllocator
-import org.apache.spark.SPARK_VERSION
import org.apache.spark.annotation.Experimental
import org.apache.spark.connect.proto
import org.apache.spark.internal.Logging
@@ -63,7 +62,9 @@ class SparkSession private[sql] (
private[this] val allocator = new RootAllocator()
- def version: String = SPARK_VERSION
+ lazy val version: String = {
+ client.analyze(proto.AnalyzePlanRequest.AnalyzeCase.SPARK_VERSION).getSparkVersion.getVersion
+ }
/**
* Runtime configuration interface for Spark.
@@ -257,7 +258,7 @@ class SparkSession private[sql] (
method: proto.AnalyzePlanRequest.AnalyzeCase,
explainMode: Option[proto.AnalyzePlanRequest.Explain.ExplainMode] = None)
: proto.AnalyzePlanResponse = {
- client.analyze(plan, method, explainMode)
+ client.analyze(method, Some(plan), explainMode)
}
private[sql] def execute[T](plan: proto.Plan, encoder: AgnosticEncoder[T]): SparkResult[T] = {
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 3f9416f216a..cdc0b381a44 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -90,46 +90,53 @@ private[sql] class SparkConnectClient(
* A [[proto.AnalyzePlanResponse]] from the Spark Connect server.
*/
def analyze(
- plan: proto.Plan,
method: proto.AnalyzePlanRequest.AnalyzeCase,
+ plan: Option[proto.Plan] = None,
explainMode: Option[proto.AnalyzePlanRequest.Explain.ExplainMode] = None)
: proto.AnalyzePlanResponse = {
val builder = proto.AnalyzePlanRequest.newBuilder()
method match {
case proto.AnalyzePlanRequest.AnalyzeCase.SCHEMA =>
+ assert(plan.isDefined)
builder.setSchema(
proto.AnalyzePlanRequest.Schema
.newBuilder()
- .setPlan(plan)
+ .setPlan(plan.get)
.build())
case proto.AnalyzePlanRequest.AnalyzeCase.EXPLAIN =>
if (explainMode.isEmpty) {
throw new IllegalArgumentException(s"ExplainMode is required in Explain request")
}
+ assert(plan.isDefined)
builder.setExplain(
proto.AnalyzePlanRequest.Explain
.newBuilder()
- .setPlan(plan)
+ .setPlan(plan.get)
.setExplainMode(explainMode.get)
.build())
case proto.AnalyzePlanRequest.AnalyzeCase.IS_LOCAL =>
+ assert(plan.isDefined)
builder.setIsLocal(
proto.AnalyzePlanRequest.IsLocal
.newBuilder()
- .setPlan(plan)
+ .setPlan(plan.get)
.build())
case proto.AnalyzePlanRequest.AnalyzeCase.IS_STREAMING =>
+ assert(plan.isDefined)
builder.setIsStreaming(
proto.AnalyzePlanRequest.IsStreaming
.newBuilder()
- .setPlan(plan)
+ .setPlan(plan.get)
.build())
case proto.AnalyzePlanRequest.AnalyzeCase.INPUT_FILES =>
+ assert(plan.isDefined)
builder.setInputFiles(
proto.AnalyzePlanRequest.InputFiles
.newBuilder()
- .setPlan(plan)
+ .setPlan(plan.get)
.build())
+ case proto.AnalyzePlanRequest.AnalyzeCase.SPARK_VERSION =>
+ builder.setSparkVersion(proto.AnalyzePlanRequest.SparkVersion.newBuilder().build())
case other => throw new IllegalArgumentException(s"Unknown Analyze request $other")
}
val request = builder
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 3f00f7c9c36..d47cc3858ab 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -518,6 +518,10 @@ class ClientE2ETestSuite extends RemoteSparkSession {
assert(!spark.conf.isModifiable("spark.sql.globalTempDatabase"))
intercept[Exception](spark.conf.set("spark.sql.globalTempDatabase", "/dev/null"))
}
+
+ test("SparkVersion") {
+ assert(!spark.version.isEmpty)
+ }
}
private[sql] case class MyType(id: Long, a: Double, b: Double)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org