You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/03/04 01:45:33 UTC

[spark] branch master updated: [SPARK-42667][CONNECT] Spark Connect: newSession API

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 00b7ca30740 [SPARK-42667][CONNECT] Spark Connect: newSession API
00b7ca30740 is described below

commit 00b7ca3074094822b3b5b3da1b292c6d25dca220
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Fri Mar 3 21:45:18 2023 -0400

    [SPARK-42667][CONNECT] Spark Connect: newSession API
    
    ### What changes were proposed in this pull request?
    
    This PR proposes an implementation of newSession API. The idea is we reuse user context(e.g. user_id), gRPC channel, etc. But differentiate different Spark Remote Session by client id, which is generated randomly.
    
    So this idea has the benefits of:
    1. reusing gRPC channel to not over too manny connections to the server.
    2. Each user can has multiple remote sessions, differentiated by client ids (or named session ids in server side).
    
    ### Why are the changes needed?
    
    API coverage
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    ### How was this patch tested?
    
    UT
    
    Closes #40272 from amaliujia/new_session.
    
    Authored-by: Rui Wang <ru...@databricks.com>
    Signed-off-by: Herman van Hovell <he...@databricks.com>
---
 .../jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala      | 2 +-
 .../org/apache/spark/sql/connect/client/SparkConnectClient.scala    | 4 ++++
 .../src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala    | 6 ++++++
 3 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index a8a88d63b1a..2b032b7cc8a 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -344,7 +344,7 @@ class SparkSession private[sql] (
   // scalastyle:on
 
   def newSession(): SparkSession = {
-    throw new UnsupportedOperationException("newSession is not supported")
+    SparkSession.builder().client(client.copy()).build()
   }
 
   private def range(
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 599aab441de..8828a4a87e6 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -149,6 +149,10 @@ private[sql] class SparkConnectClient(
     analyze(request)
   }
 
+  def copy(): SparkConnectClient = {
+    new SparkConnectClient(userContext, channel, userAgent)
+  }
+
   /**
    * Add a single artifact to the client session.
    *
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index ffbf3cee025..a3f1de55892 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -586,6 +586,12 @@ class ClientE2ETestSuite extends RemoteSparkSession {
       list.asScala.map(kv => Row(kv.key, kv.value)),
       session.createDataFrame(list.asScala.toSeq))
   }
+
+  test("SparkSession newSession") {
+    val oldId = spark.sql("SELECT 1").analyze.getClientId
+    val newId = spark.newSession().sql("SELECT 1").analyze.getClientId
+    assert(oldId != newId)
+  }
 }
 
 private[sql] case class MyType(id: Long, a: Double, b: Double)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org