You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/11/25 02:27:39 UTC

[GitHub] [spark] maropu commented on a change in pull request #30486: [SPARK-33530][CORE] Support --archives and spark.archives option natively

maropu commented on a change in pull request #30486:
URL: https://github.com/apache/spark/pull/30486#discussion_r530054680



##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -422,6 +426,8 @@ class SparkContext(config: SparkConf) extends Logging {
     _jars = Utils.getUserJars(_conf)
     _files = _conf.getOption(FILES.key).map(_.split(",")).map(_.filter(_.nonEmpty))
       .toSeq.flatten
+    _archives = _conf.getOption(ARCHIVES.key).map(_.split(",")).map(_.filter(_.nonEmpty))
+      .toSeq.flatten

Review comment:
       nit: `_archives = _conf.getOption(ARCHIVES.key).map(Utils.stringToSeq).toSeq.flatten`?

##########
File path: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
##########
@@ -512,6 +513,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
         |  --files FILES               Comma-separated list of files to be placed in the working
         |                              directory of each executor. File paths of these files
         |                              in executors can be accessed via SparkFiles.get(fileName).
+        |  --archives ARCHIVES         Comma separated list of archives to be extracted into the

Review comment:
       nit: `Comma separated` -> `Comma-separated` for consistency?

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1803,6 +1803,16 @@ package object config {
     .toSequence
     .createWithDefault(Nil)
 
+  private[spark] val ARCHIVES = ConfigBuilder("spark.archives")
+    .version("3.1.0")
+    .doc("Comma separated list of archives to be extracted into the working directory of each " +

Review comment:
       btw, we don't need to describe something here about the relationship (or difference?) between `park.files` and `spark.archives`? 

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -494,7 +495,8 @@ private[spark] object Utils extends Logging {
       securityMgr: SecurityManager,
       hadoopConf: Configuration,
       timestamp: Long,
-      useCache: Boolean): File = {
+      useCache: Boolean,
+      shouldUntar: Boolean = true): File = {

Review comment:
       In L487, could you document what the new param `shouldUntar` is?

##########
File path: core/src/main/scala/org/apache/spark/executor/Executor.scala
##########
@@ -230,16 +232,17 @@ private[spark] class Executor(
   private val appStartTime = conf.getLong("spark.app.startTime", 0)
 
   // To allow users to distribute plugins and their required files
-  // specified by --jars and --files on application submission, those jars/files should be
-  // downloaded and added to the class loader via updateDependencies.
-  // This should be done before plugin initialization below
+  // specified by --jars, --files and --archives on application submission, those
+  // jars/files/archives should be downloaded and added to the class loader vi

Review comment:
       nit: `vi` -> `via`

##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -1568,21 +1613,38 @@ class SparkContext(config: SparkConf) extends Logging {
 
     val key = if (!isLocal && scheme == "file") {
       env.rpcEnv.fileServer.addFile(new File(uri.getPath))
+    } else if (uri.getScheme == null) {
+      schemeCorrectedURI.toString
+    } else if (isArchive) {
+      uri.toString
     } else {
-        if (uri.getScheme == null) {
-          schemeCorrectedURI.toString
-        } else {
-          path
-        }
+      path
     }
+
     val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
-    if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
+    if (!isArchive && addedFiles.putIfAbsent(key, timestamp).isEmpty) {
       logInfo(s"Added file $path at $key with timestamp $timestamp")
       // Fetch the file locally so that closures which are run on the driver can still use the
       // SparkFiles API to access files.
       Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), conf,
         env.securityManager, hadoopConfiguration, timestamp, useCache = false)
       postEnvironmentUpdate()
+    } else if (
+      isArchive &&
+        addedArchives.putIfAbsent(
+          UriBuilder.fromUri(new URI(key)).fragment(uri.getFragment).build().toString,
+          timestamp).isEmpty) {
+      logInfo(s"Added archive $path at $key with timestamp $timestamp")
+      val uriToDownload = UriBuilder.fromUri(new URI(key)).fragment(null).build()
+      val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf,
+        env.securityManager, hadoopConfiguration, timestamp, useCache = false, shouldUntar = false)
+      logInfo(s"Unpacking an archive $path")

Review comment:
       How about logging source and dest, too?

##########
File path: core/src/main/scala/org/apache/spark/executor/Executor.scala
##########
@@ -907,32 +911,49 @@ private[spark] class Executor(
    * Download any missing dependencies if we receive a new set of files and JARs from the
    * SparkContext. Also adds any new JARs we fetched to the class loader.
    */
-  private def updateDependencies(newFiles: Map[String, Long], newJars: Map[String, Long]): Unit = {
+  private def updateDependencies(
+      newFiles: Map[String, Long],
+      newJars: Map[String, Long],
+      newArchives: Map[String, Long]): Unit = {
     lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
     synchronized {
       // Fetch missing dependencies
       for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
-        logInfo("Fetching " + name + " with timestamp " + timestamp)
+        logInfo(s"Fetching $name with timestamp $timestamp")
         // Fetch file with useCache mode, close cache for local mode.
         Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,
           env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
         currentFiles(name) = timestamp
       }
+      for ((name, timestamp) <- newArchives if currentArchives.getOrElse(name, -1L) < timestamp) {
+        logInfo(s"Fetching $name with timestamp $timestamp")
+        val sourceURI = new URI(name)
+        val uriToDownload = UriBuilder.fromUri(sourceURI).fragment(null).build()
+        val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf,
+          env.securityManager, hadoopConf, timestamp, useCache = false, shouldUntar = false)

Review comment:
       We cannot use cache for this purpose?

##########
File path: docs/configuration.md
##########
@@ -784,6 +784,17 @@ Apart from these, the following properties are also available, and may be useful
   </td>
   <td>2.3.0</td>
 </tr>
+<tr>
+  <td><code>spark.archives</code></td>
+  <td></td>
+  <td>
+    Comma separated list of archives to be extracted into the working directory of each executor.

Review comment:
       ditto: `Comma separated` -> `Comma-separated`

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1803,6 +1803,16 @@ package object config {
     .toSequence
     .createWithDefault(Nil)
 
+  private[spark] val ARCHIVES = ConfigBuilder("spark.archives")
+    .version("3.1.0")
+    .doc("Comma separated list of archives to be extracted into the working directory of each " +

Review comment:
       nit: `Comma separated` -> `Comma-separated` for consistency?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org