You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/02/28 21:48:21 UTC

[spark] branch branch-3.4 updated: [SPARK-42615][CONNECT][FOLLOW-UP] Implement correct version API in SparkSession for Scala client

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new dc5efe1eb07 [SPARK-42615][CONNECT][FOLLOW-UP] Implement correct version API in SparkSession for Scala client
dc5efe1eb07 is described below

commit dc5efe1eb077288488266e1b319cc61c4fb75408
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Tue Feb 28 13:47:48 2023 -0800

    [SPARK-42615][CONNECT][FOLLOW-UP] Implement correct version API in SparkSession for Scala client
    
    ### What changes were proposed in this pull request?
    
    Following up on https://github.com/apache/spark/pull/40210, add correct `version` in the scala client side.
    
    ### Why are the changes needed?
    
    API coverage
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    UT
    
    Closes #40222 from amaliujia/improve_sparksession.
    
    Authored-by: Rui Wang <ru...@databricks.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 41bc4e6f8068571bc58186f090717bc645914105)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../scala/org/apache/spark/sql/SparkSession.scala     |  7 ++++---
 .../spark/sql/connect/client/SparkConnectClient.scala | 19 +++++++++++++------
 .../org/apache/spark/sql/ClientE2ETestSuite.scala     |  4 ++++
 3 files changed, 21 insertions(+), 9 deletions(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index ee902870c77..a05dc89a54f 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -24,7 +24,6 @@ import scala.collection.JavaConverters._
 
 import org.apache.arrow.memory.RootAllocator
 
-import org.apache.spark.SPARK_VERSION
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.connect.proto
 import org.apache.spark.internal.Logging
@@ -63,7 +62,9 @@ class SparkSession private[sql] (
 
   private[this] val allocator = new RootAllocator()
 
-  def version: String = SPARK_VERSION
+  lazy val version: String = {
+    client.analyze(proto.AnalyzePlanRequest.AnalyzeCase.SPARK_VERSION).getSparkVersion.getVersion
+  }
 
   /**
    * Runtime configuration interface for Spark.
@@ -257,7 +258,7 @@ class SparkSession private[sql] (
       method: proto.AnalyzePlanRequest.AnalyzeCase,
       explainMode: Option[proto.AnalyzePlanRequest.Explain.ExplainMode] = None)
       : proto.AnalyzePlanResponse = {
-    client.analyze(plan, method, explainMode)
+    client.analyze(method, Some(plan), explainMode)
   }
 
   private[sql] def execute[T](plan: proto.Plan, encoder: AgnosticEncoder[T]): SparkResult[T] = {
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 3f9416f216a..cdc0b381a44 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -90,46 +90,53 @@ private[sql] class SparkConnectClient(
    *   A [[proto.AnalyzePlanResponse]] from the Spark Connect server.
    */
   def analyze(
-      plan: proto.Plan,
       method: proto.AnalyzePlanRequest.AnalyzeCase,
+      plan: Option[proto.Plan] = None,
       explainMode: Option[proto.AnalyzePlanRequest.Explain.ExplainMode] = None)
       : proto.AnalyzePlanResponse = {
     val builder = proto.AnalyzePlanRequest.newBuilder()
     method match {
       case proto.AnalyzePlanRequest.AnalyzeCase.SCHEMA =>
+        assert(plan.isDefined)
         builder.setSchema(
           proto.AnalyzePlanRequest.Schema
             .newBuilder()
-            .setPlan(plan)
+            .setPlan(plan.get)
             .build())
       case proto.AnalyzePlanRequest.AnalyzeCase.EXPLAIN =>
         if (explainMode.isEmpty) {
           throw new IllegalArgumentException(s"ExplainMode is required in Explain request")
         }
+        assert(plan.isDefined)
         builder.setExplain(
           proto.AnalyzePlanRequest.Explain
             .newBuilder()
-            .setPlan(plan)
+            .setPlan(plan.get)
             .setExplainMode(explainMode.get)
             .build())
       case proto.AnalyzePlanRequest.AnalyzeCase.IS_LOCAL =>
+        assert(plan.isDefined)
         builder.setIsLocal(
           proto.AnalyzePlanRequest.IsLocal
             .newBuilder()
-            .setPlan(plan)
+            .setPlan(plan.get)
             .build())
       case proto.AnalyzePlanRequest.AnalyzeCase.IS_STREAMING =>
+        assert(plan.isDefined)
         builder.setIsStreaming(
           proto.AnalyzePlanRequest.IsStreaming
             .newBuilder()
-            .setPlan(plan)
+            .setPlan(plan.get)
             .build())
       case proto.AnalyzePlanRequest.AnalyzeCase.INPUT_FILES =>
+        assert(plan.isDefined)
         builder.setInputFiles(
           proto.AnalyzePlanRequest.InputFiles
             .newBuilder()
-            .setPlan(plan)
+            .setPlan(plan.get)
             .build())
+      case proto.AnalyzePlanRequest.AnalyzeCase.SPARK_VERSION =>
+        builder.setSparkVersion(proto.AnalyzePlanRequest.SparkVersion.newBuilder().build())
       case other => throw new IllegalArgumentException(s"Unknown Analyze request $other")
     }
     val request = builder
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 3f00f7c9c36..d47cc3858ab 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -518,6 +518,10 @@ class ClientE2ETestSuite extends RemoteSparkSession {
     assert(!spark.conf.isModifiable("spark.sql.globalTempDatabase"))
     intercept[Exception](spark.conf.set("spark.sql.globalTempDatabase", "/dev/null"))
   }
+
+  test("SparkVersion") {
+    assert(!spark.version.isEmpty)
+  }
 }
 
 private[sql] case class MyType(id: Long, a: Double, b: Double)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org