You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "hvanhovell (via GitHub)" <gi...@apache.org> on 2023/08/07 02:06:33 UTC

[GitHub] [spark] hvanhovell opened a new pull request, #42367: [SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client

hvanhovell opened a new pull request, #42367:
URL: https://github.com/apache/spark/pull/42367

   ### What changes were proposed in this pull request?
   This adds the `default` and `active` session variables to `SparkSession`:
   - `default` session is global value. It is typically the first session created through `getOrCreate`. It can be changed through `set` or `clear`. If the session is closed and it is the `default` session we clear the `default` session.
   - `active` session is a thread local value. It is typically the first session created in this thread or it inherits is value from its parent thread. It can be changed through `set` or `clear`, please note that these methods operate thread locally, so they won't change the parent or children. If the session is closed and it is the `active` session for the current thread then we clear the active value (only for the current thread!).
   
   ### Why are the changes needed?
   To increase compatibility with the existing SparkSession API in `sql/core`.
   
   ### Does this PR introduce _any_ user-facing change?
   Yes. It adds a couple methods that were missing from the Scala Client.
   
   ### How was this patch tested?
   Added tests to `SparkSessionSuite`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42367: [SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42367:
URL: https://github.com/apache/spark/pull/42367#discussion_r1285319207


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -742,8 +748,17 @@ object SparkSession extends Logging {
    */
   private[sql] def onSessionClose(session: SparkSession): Unit = {
     sessions.invalidate(session.client.configuration)
+    defaultSession.compareAndSet(session, null)
+    if (getActiveSession.contains(session)) {
+      clearActiveSession()
+    }
   }
 
+  /**
+   * Creates a [[SparkSession.Builder]] for constructing a [[SparkSession]].
+   *
+   * @since 3.4.0

Review Comment:
   ```suggestion
      * @since 3.5.0
   ```



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -742,8 +748,17 @@ object SparkSession extends Logging {
    */
   private[sql] def onSessionClose(session: SparkSession): Unit = {
     sessions.invalidate(session.client.configuration)
+    defaultSession.compareAndSet(session, null)
+    if (getActiveSession.contains(session)) {
+      clearActiveSession()
+    }
   }
 
+  /**
+   * Creates a [[SparkSession.Builder]] for constructing a [[SparkSession]].
+   *
+   * @since 3.4.0

Review Comment:
   ```suggestion
      * @since 3.5.0
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #42367: [SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #42367:
URL: https://github.com/apache/spark/pull/42367#issuecomment-1669397294

   @LuciferYang I pressed the wrong button while doing the backport, thanks for letting me know. As for flakiness, the whole point of using a phaser was to avoid this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #42367: [SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #42367:
URL: https://github.com/apache/spark/pull/42367#issuecomment-1668807299

   Merging to master/3.5


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42367: [SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42367:
URL: https://github.com/apache/spark/pull/42367#discussion_r1285324320


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -811,30 +826,84 @@ object SparkSession extends Logging {
      * If a session exist with the same configuration that is returned instead of creating a new
      * session.
      *
+     * This method will update the default and/or active session if they are not set.
+     *
      * @since 3.5.0
      */
     def getOrCreate(): SparkSession = {

Review Comment:
   We don't support this in the scala client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42367: [SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42367:
URL: https://github.com/apache/spark/pull/42367#discussion_r1285319137


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -811,30 +826,84 @@ object SparkSession extends Logging {
      * If a session exist with the same configuration that is returned instead of creating a new
      * session.
      *
+     * This method will update the default and/or active session if they are not set.
+     *
      * @since 3.5.0
      */
     def getOrCreate(): SparkSession = {
-      tryCreateSessionFromClient().getOrElse(sessions.get(builder.configuration))
+      val session = tryCreateSessionFromClient()
+        .getOrElse(sessions.get(builder.configuration))
+      // To be compatible with the SQL API we update the default
+      // and active session here if they are not set.
+      defaultSession.compareAndSet(null, session)
+      if (getActiveSession.isEmpty) {
+        setActiveSession(session)

Review Comment:
   We should probably either do this same in the `create`, or revert that behaviour in PySpark (see also https://github.com/apache/spark/pull/41987), cc @cdkrot @grundprinzip 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #42367: [SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #42367:
URL: https://github.com/apache/spark/pull/42367#issuecomment-1667080198

   cc @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42367: [SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42367:
URL: https://github.com/apache/spark/pull/42367#discussion_r1285321888


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -811,30 +826,84 @@ object SparkSession extends Logging {
      * If a session exist with the same configuration that is returned instead of creating a new
      * session.
      *
+     * This method will update the default and/or active session if they are not set.
+     *
      * @since 3.5.0
      */
     def getOrCreate(): SparkSession = {
-      tryCreateSessionFromClient().getOrElse(sessions.get(builder.configuration))
+      val session = tryCreateSessionFromClient()
+        .getOrElse(sessions.get(builder.configuration))
+      // To be compatible with the SQL API we update the default
+      // and active session here if they are not set.
+      defaultSession.compareAndSet(null, session)
+      if (getActiveSession.isEmpty) {
+        setActiveSession(session)
+      }
+      session
     }
   }
 
-  def getActiveSession: Option[SparkSession] = {
-    throw new UnsupportedOperationException("getActiveSession is not supported")
+  /**
+   * Returns the default SparkSession.
+   *
+   * @since 3.5.0
+   */
+  def getDefaultSession: Option[SparkSession] = Option(defaultSession.get())
+
+  /**
+   * Sets the default SparkSession.
+   *
+   * @since 3.5.0
+   */
+  def setDefaultSession(session: SparkSession): Unit = {
+    defaultSession.set(session)
   }
 
-  def getDefaultSession: Option[SparkSession] = {
-    throw new UnsupportedOperationException("getDefaultSession is not supported")
+  /**
+   * Clears the default SparkSession.
+   *
+   * @since 3.5.0
+   */
+  def clearDefaultSession(): Unit = {
+    defaultSession.set(null)
   }
 
+  /**
+   * Returns the active SparkSession for the current thread.
+   *
+   * @since 3.5.0
+   */
+  def getActiveSession: Option[SparkSession] = Option(activeThreadSession.get())

Review Comment:
   That is for internals, because that is the way SQLConf.get functions. For the scala client, which is very thin, we don't need these thread locals (we thread through the confs if we need them). There is also no user code interaction in the internals so we also don't need it for that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42367: [SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42367:
URL: https://github.com/apache/spark/pull/42367#discussion_r1285324320


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -811,30 +826,84 @@ object SparkSession extends Logging {
      * If a session exist with the same configuration that is returned instead of creating a new
      * session.
      *
+     * This method will update the default and/or active session if they are not set.
+     *
      * @since 3.5.0
      */
     def getOrCreate(): SparkSession = {

Review Comment:
   We don't support this in the scala client yet...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42367: [SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42367:
URL: https://github.com/apache/spark/pull/42367#discussion_r1285322233


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -811,30 +826,84 @@ object SparkSession extends Logging {
      * If a session exist with the same configuration that is returned instead of creating a new
      * session.
      *
+     * This method will update the default and/or active session if they are not set.
+     *
      * @since 3.5.0
      */
     def getOrCreate(): SparkSession = {
-      tryCreateSessionFromClient().getOrElse(sessions.get(builder.configuration))
+      val session = tryCreateSessionFromClient()
+        .getOrElse(sessions.get(builder.configuration))
+      // To be compatible with the SQL API we update the default
+      // and active session here if they are not set.
+      defaultSession.compareAndSet(null, session)
+      if (getActiveSession.isEmpty) {
+        setActiveSession(session)

Review Comment:
   ok, will also set it in create.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42367: [SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42367:
URL: https://github.com/apache/spark/pull/42367#discussion_r1285322076


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -811,30 +826,84 @@ object SparkSession extends Logging {
      * If a session exist with the same configuration that is returned instead of creating a new
      * session.
      *
+     * This method will update the default and/or active session if they are not set.
+     *
      * @since 3.5.0
      */
     def getOrCreate(): SparkSession = {

Review Comment:
   Another annoying thing is about applying options. In non Spark Connect, we apply the specified runtime options even when the active session is there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #42367: [SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #42367:
URL: https://github.com/apache/spark/pull/42367#issuecomment-1669259474

   > Merging to master/3.5
   
   @hvanhovell Has this PR not been merged into branch-3.5? I couldn't find it in the commits list.
   
   <img width="1140" alt="image" src="https://github.com/apache/spark/assets/1475305/18c405cd-08f7-41db-8363-25737cc247aa">
   
   
   Additionally, the newly added test cases(`active session in multiple threads`) seem flaky? I've spotted some failed tests, for example:
   - https://github.com/apache/spark/actions/runs/5792277901/job/15698338336
   
   <img width="1090" alt="image" src="https://github.com/apache/spark/assets/1475305/c0e83492-f684-40de-bddb-9c52e5b8528d">
   
   re-run will ok
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42367: [SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42367:
URL: https://github.com/apache/spark/pull/42367#discussion_r1285320997


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -811,30 +826,84 @@ object SparkSession extends Logging {
      * If a session exist with the same configuration that is returned instead of creating a new
      * session.
      *
+     * This method will update the default and/or active session if they are not set.
+     *
      * @since 3.5.0
      */
     def getOrCreate(): SparkSession = {
-      tryCreateSessionFromClient().getOrElse(sessions.get(builder.configuration))
+      val session = tryCreateSessionFromClient()
+        .getOrElse(sessions.get(builder.configuration))
+      // To be compatible with the SQL API we update the default
+      // and active session here if they are not set.
+      defaultSession.compareAndSet(null, session)
+      if (getActiveSession.isEmpty) {
+        setActiveSession(session)
+      }
+      session
     }
   }
 
-  def getActiveSession: Option[SparkSession] = {
-    throw new UnsupportedOperationException("getActiveSession is not supported")
+  /**
+   * Returns the default SparkSession.
+   *
+   * @since 3.5.0
+   */
+  def getDefaultSession: Option[SparkSession] = Option(defaultSession.get())
+
+  /**
+   * Sets the default SparkSession.
+   *
+   * @since 3.5.0
+   */
+  def setDefaultSession(session: SparkSession): Unit = {
+    defaultSession.set(session)
   }
 
-  def getDefaultSession: Option[SparkSession] = {
-    throw new UnsupportedOperationException("getDefaultSession is not supported")
+  /**
+   * Clears the default SparkSession.
+   *
+   * @since 3.5.0
+   */
+  def clearDefaultSession(): Unit = {
+    defaultSession.set(null)
   }
 
+  /**
+   * Returns the active SparkSession for the current thread.
+   *
+   * @since 3.5.0
+   */
+  def getActiveSession: Option[SparkSession] = Option(activeThreadSession.get())

Review Comment:
   Once annoying thing would be this active session. in non Spark Connect mode, we set this active whenever we call an API associated to the current Spark session.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42367: [SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42367:
URL: https://github.com/apache/spark/pull/42367#discussion_r1285345765


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -811,30 +842,79 @@ object SparkSession extends Logging {
      * If a session exist with the same configuration that is returned instead of creating a new
      * session.
      *
+     * This method will update the default and/or active session if they are not set.
+     *
      * @since 3.5.0
      */
     def getOrCreate(): SparkSession = {
-      tryCreateSessionFromClient().getOrElse(sessions.get(builder.configuration))
+      val session = tryCreateSessionFromClient()
+        .getOrElse(sessions.get(builder.configuration))
+      setDefaultAndActiveSession(session)
+      session

Review Comment:
   qq shouldn't we return the active session here? Seems like it will always return the newly created one, and sets the active session if not set.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell closed pull request #42367: [SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell closed pull request #42367: [SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client
URL: https://github.com/apache/spark/pull/42367


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org