You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "hvanhovell (via GitHub)" <gi...@apache.org> on 2023/08/07 02:20:15 UTC

[GitHub] [spark] hvanhovell commented on a diff in pull request #42367: [SPARK-43429][CONNECT] Add Default & Active SparkSession for Scala Client

hvanhovell commented on code in PR #42367:
URL: https://github.com/apache/spark/pull/42367#discussion_r1285321888


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -811,30 +826,84 @@ object SparkSession extends Logging {
      * If a session exist with the same configuration that is returned instead of creating a new
      * session.
      *
+     * This method will update the default and/or active session if they are not set.
+     *
      * @since 3.5.0
      */
     def getOrCreate(): SparkSession = {
-      tryCreateSessionFromClient().getOrElse(sessions.get(builder.configuration))
+      val session = tryCreateSessionFromClient()
+        .getOrElse(sessions.get(builder.configuration))
+      // To be compatible with the SQL API we update the default
+      // and active session here if they are not set.
+      defaultSession.compareAndSet(null, session)
+      if (getActiveSession.isEmpty) {
+        setActiveSession(session)
+      }
+      session
     }
   }
 
-  def getActiveSession: Option[SparkSession] = {
-    throw new UnsupportedOperationException("getActiveSession is not supported")
+  /**
+   * Returns the default SparkSession.
+   *
+   * @since 3.5.0
+   */
+  def getDefaultSession: Option[SparkSession] = Option(defaultSession.get())
+
+  /**
+   * Sets the default SparkSession.
+   *
+   * @since 3.5.0
+   */
+  def setDefaultSession(session: SparkSession): Unit = {
+    defaultSession.set(session)
   }
 
-  def getDefaultSession: Option[SparkSession] = {
-    throw new UnsupportedOperationException("getDefaultSession is not supported")
+  /**
+   * Clears the default SparkSession.
+   *
+   * @since 3.5.0
+   */
+  def clearDefaultSession(): Unit = {
+    defaultSession.set(null)
   }
 
+  /**
+   * Returns the active SparkSession for the current thread.
+   *
+   * @since 3.5.0
+   */
+  def getActiveSession: Option[SparkSession] = Option(activeThreadSession.get())

Review Comment:
   That is for internals, because that is the way SQLConf.get functions. For the scala client, which is very thin, we don't need these thread locals (we thread through the confs if we need them). There is also no user code interaction in the internals so we also don't need it for that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org