You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "vicennial (via GitHub)" <gi...@apache.org> on 2023/07/06 16:44:02 UTC

[GitHub] [spark] vicennial commented on a diff in pull request #41495: [SPARK-44290][CONNECT] Session-based files and archives in Spark Connect

vicennial commented on code in PR #41495:
URL: https://github.com/apache/spark/pull/41495#discussion_r1254104413


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala:
##########
@@ -132,10 +131,16 @@ class SparkConnectArtifactManager(sessionHolder: SessionHolder) extends Logging
       }
       Files.move(serverLocalStagingPath, target)
       if (remoteRelativePath.startsWith(s"jars${File.separator}")) {
+        sessionHolder.session.sessionState.resourceLoader
+          .addJar(target.toString, sessionHolder.sessionId)
         jarsList.add(target)
-        jarsURI.add(artifactURI + "/" + remoteRelativePath.toString)
       } else if (remoteRelativePath.startsWith(s"pyfiles${File.separator}")) {
-        sessionHolder.session.sparkContext.addFile(target.toString)
+        sessionHolder.session.sparkContext.addFile(
+          target.toString,
+          false,
+          false,
+          false,
+          sessionUUID)

Review Comment:
   Nit: Could we use named parameters here and below to improve readability?



##########
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala:
##########
@@ -165,6 +165,9 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
       envVars.put("PYTHON_FAULTHANDLER_DIR", BasePythonRunner.faultHandlerLogDir.toString)
     }
 
+    val sessionId = JobArtifactSet.getCurrentSessionUUID.getOrElse("default")
+    envVars.put("SPARK_CONNECT_SESSION_ID", sessionId)

Review Comment:
   I feel like the name`SPARK_CONNECT_SESSION_ID` may be misleading because a Spark Connect client has it's own [sessionId](https://github.com/apache/spark/blob/7bd1353084042bd5a55f6c564ad5e3359016e77d/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala#L57) while the `sessionUUID` that `JobArtifactSet` is using the UUID of the underlying `SparkSession`.



##########
core/src/main/scala/org/apache/spark/JobArtifactSet.scala:
##########
@@ -53,72 +56,70 @@ class JobArtifactSet(
           this.files.toSeq == that.files.toSeq && this.archives.toSeq == that.archives.toSeq
     }
   }
-
 }
 
-object JobArtifactSet {
 
-  private[this] val current = new ThreadLocal[Option[JobArtifactSet]] {
-    override def initialValue(): Option[JobArtifactSet] = None
-  }
+private[spark] object JobArtifactSet {
+  // For testing.
+  val emptyJobArtifactSet: JobArtifactSet = new JobArtifactSet(
+    None, None, Map.empty, Map.empty, Map.empty)
+  // For testing.
+  def defaultJobArtifactSet: JobArtifactSet = SparkContext.getActive.map(
+    getActiveOrDefault).getOrElse(emptyJobArtifactSet)
 
-  /**
-   * When Spark Connect isn't used, we default back to the shared resources.
-   * @param sc The active [[SparkContext]]
-   * @return A [[JobArtifactSet]] containing a copy of the jars/files/archives from the underlying
-   *         [[SparkContext]] `sc`.
-   */
-  def apply(sc: SparkContext): JobArtifactSet = {
-    new JobArtifactSet(
-      uuid = None,
-      replClassDirUri = sc.conf.getOption("spark.repl.class.uri"),
-      jars = sc.addedJars.toMap,
-      files = sc.addedFiles.toMap,
-      archives = sc.addedArchives.toMap)
+  private[this] val currentSessionUUID = new ThreadLocal[Option[String]] {
+    override def initialValue(): Option[String] = None
   }
 
-  private lazy val emptyJobArtifactSet = new JobArtifactSet(
-    None,
-    None,
-    Map.empty,
-    Map.empty,
-    Map.empty)
-
-  /**
-   * Empty artifact set for use in tests.
-   */
-  private[spark] def apply(): JobArtifactSet = emptyJobArtifactSet
-
-  /**
-   * Used for testing. Returns artifacts from [[SparkContext]] if one exists or otherwise, an
-   * empty set.
-   */
-  private[spark] def defaultArtifactSet(): JobArtifactSet = {
-    SparkContext.getActive.map(sc => JobArtifactSet(sc)).getOrElse(JobArtifactSet())
+  private[this] val currentReplClassDirURI = new ThreadLocal[Option[String]] {
+    override def initialValue(): Option[String] = None
   }
 
+  def getCurrentSessionUUID: Option[String] = currentSessionUUID.get()
+
   /**
-   * Execute a block of code with the currently active [[JobArtifactSet]].
-   * @param active
-   * @param block
-   * @tparam T
+   * Set the Spark Connect specific information in the active client to the underlying
+   * [[JobArtifactSet]].
+   *
+   * @param uuid UUID to use in the current context of jab artifact set. Usually this is from
+   *             a Spark Connect client.
+   * @param classDirUri The URI for the directory that stores REPL classes.
+   * @return the result from the function applied with [[JobArtifactSet]] specific to
+   *         the active client.
    */
-  def withActive[T](active: JobArtifactSet)(block: => T): T = {
-    val old = current.get()
-    current.set(Option(active))
+  def withActive[T](uuid: String, classDirUri: Option[String])(block: => T): T = {

Review Comment:
   IMO this feels a bit weird to use `withActive` this way because we are passing in a few of the arguments to create the `JobArtifactSet` but we don't create one and instead of rely on the eventual call to `getActiveOrDefault` to create the final instance.
   
   Why not have a helper method to create an instance and use the instance itself in `withActive`? 
   If we want to limit what the contents of `jars/files/archives` vals are, we could make the default constructor private and force creation through `apply` or helper methods.
   
   



##########
core/src/main/scala/org/apache/spark/JobArtifactSet.scala:
##########
@@ -33,13 +33,16 @@ import java.util.Objects
  * @param files Files belonging to this session.
  * @param archives Archives belonging to this session.
  */
-class JobArtifactSet(
+private[spark] class JobArtifactSet(
     val uuid: Option[String],
     val replClassDirUri: Option[String],
     val jars: Map[String, Long],
     val files: Map[String, Long],
     val archives: Map[String, Long]) extends Serializable {
-  def withActive[T](f: => T): T = JobArtifactSet.withActive(this)(f)
+  def withActive[T](f: => T): T = {
+    assert(uuid.nonEmpty)

Review Comment:
   Adding to the comment regd `withActive`, if we make the constructor private and control the contents of the jars/files/archives here, we could drop this requirement



##########
core/src/main/scala/org/apache/spark/SparkContext.scala:
##########
@@ -291,9 +292,16 @@ class SparkContext(config: SparkConf) extends Logging {
   private[spark] def env: SparkEnv = _env
 
   // Used to store a URL for each static file/jar together with the file's local timestamp

Review Comment:
   Nit: Documentation could use an update because the first `String` parameter now refers to the `sessionId` rather than the `URL`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org