You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "shuwang21 (via GitHub)" <gi...@apache.org> on 2023/08/05 03:55:12 UTC

[GitHub] [spark] shuwang21 opened a new pull request, #42357: [WIP][SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

shuwang21 opened a new pull request, #42357:
URL: https://github.com/apache/spark/pull/42357

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1292600902


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,52 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): List[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val parentUri = new Path(Utils.resolveURI(jar)).getParent.toUri
+        directoryCounter.update(parentUri, directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.filter(_._2 >= statCachePreloadDirectoryCountThreshold).keys.toList
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fsLookup: Function for looking up an FS based on a URI; override for testing
+   * @return A preloaded statCache with fileStatus
+   */
+  private[yarn] def getPreloadedStatCache(
+      fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
+    val statCache = HashMap[URI, FileStatus]()
+    val jars = sparkConf.get(SPARK_JARS)
+    val directories = jars.map(directoriesToBePreloaded).getOrElse(ArrayBuffer.empty[URI])
+
+    directories.foreach { dir =>
+      fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile())
+        .foreach { fileStatus =>
+          val uri = fileStatus.getPath.toUri
+          if (jars.exists(_.contains(uri.toString))) {
+            logDebug(s"Add ${uri} file status to statCache.")
+            statCache.put(uri, fileStatus)
+          }
+        }
+    }

Review Comment:
   `jars` contains unresolved paths, while `dir` has been resolved - which can result in `jars.exists(_.contains(uri.toString))`  returning `false`.
   How about return Map[dir: String , Set[ fileName: String ] ] in `directoriesToBePreloaded` and check for  `fileStatus.getPath.getName` in the value set ?
   
   Reformulate it as:
   
   ```suggestion
       directoryToFiles.foreach { case (dir: String, filesInDir: Set[String]) =>
         fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile()).
           filter(f => filesInDir.contains(f.getPath.getName)).foreach { fileStatus =>
             val uri = fileStatus.getPath.toUri
             statCache.put(uri, fileStatus)
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shuwang21 commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "shuwang21 (via GitHub)" <gi...@apache.org>.
shuwang21 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1327901950


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,30 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client side. This feature " +
+      "analyzes the pattern of resources paths, and if multiple resources shared the same parent " +
+      "directory, a single <code>listStatus</code> will be invoked on the parent directory " +
+      "instead of multiple <code>getFileStatus</code> performed on each individual resources. " +
+      "If most resources from a small set of directories, this can substantially improve job " +
+      "submission time. Enabling this feature may potentially increase client memory overhead.")

Review Comment:
   Do you mean `listStatus ` with `PathFilter `? I can try that. 
   
   https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/fs/FileSystem.html#listStatus-org.apache.hadoop.fs.Path-org.apache.hadoop.fs.PathFilter-



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1330352323


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -533,9 +536,12 @@ private[spark] class Client(
     // If preload is enabled, preload the statCache with the files in the directories
     val statCache = if (statCachePreloadEnabled) {
       // Consider only following configurations, as they involve the distribution of multiple files
-      val files = sparkConf.get(SPARK_JARS).orNull ++ sparkConf.get(JARS_TO_DISTRIBUTE) ++
-        sparkConf.get(FILES_TO_DISTRIBUTE) ++ sparkConf.get(ARCHIVES_TO_DISTRIBUTE) ++
-        sparkConf.get(PY_FILES) ++ pySparkArchives
+      var files = sparkConf.get(JARS_TO_DISTRIBUTE) ++ sparkConf.get(FILES_TO_DISTRIBUTE) ++
+        sparkConf.get(ARCHIVES_TO_DISTRIBUTE) ++ sparkConf.get(PY_FILES) ++ pySparkArchives
+      if (!sparkConf.get(SPARK_JARS).isEmpty) {

Review Comment:
   That was to simply inline it - like what you did now :-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1318064457


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -492,10 +492,8 @@ private[spark] class Client(
   private[yarn] def getPreloadedStatCache(
       fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
     val statCache = HashMap[URI, FileStatus]()
-    val jars = sparkConf.get(SPARK_JARS)
-    val directoryToFiles = jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
-
-    directoryToFiles.foreach { case (dir: URI, filesInDir: HashSet[String]) =>
+    val jars = sparkConf.get(JARS_TO_DISTRIBUTE)

Review Comment:
   This does not look right - `statCache` is primarily used with jars in [SPARK_JARS](https://github.com/apache/spark/blob/622bbf2e29262c34021cb38c4c70f8eed258999b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L604C21-L604C31).
   
   Why change this ?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -492,10 +492,8 @@ private[spark] class Client(
   private[yarn] def getPreloadedStatCache(
       fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
     val statCache = HashMap[URI, FileStatus]()
-    val jars = sparkConf.get(SPARK_JARS)
-    val directoryToFiles = jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
-
-    directoryToFiles.foreach { case (dir: URI, filesInDir: HashSet[String]) =>
+    val jars = sparkConf.get(JARS_TO_DISTRIBUTE)

Review Comment:
   `statCache` is primarily used with jars in [SPARK_JARS](https://github.com/apache/spark/blob/622bbf2e29262c34021cb38c4c70f8eed258999b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L604C21-L604C31).
   
   Why change this ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1318069261


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -492,10 +492,8 @@ private[spark] class Client(
   private[yarn] def getPreloadedStatCache(
       fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
     val statCache = HashMap[URI, FileStatus]()
-    val jars = sparkConf.get(SPARK_JARS)
-    val directoryToFiles = jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
-
-    directoryToFiles.foreach { case (dir: URI, filesInDir: HashSet[String]) =>
+    val jars = sparkConf.get(JARS_TO_DISTRIBUTE)

Review Comment:
   I am not sure I understood the comment :-)
   `statCache` is used within `prepareLocalResources` - within that method, it is primarily used in context of `SPARK_JARS` - so why change to a different config ?
   
   Or in other words, if it applies to both - why only `JARS_TO_DISTRIBUTE` and not to all relevant configs ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1294151353


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,52 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): List[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val parentUri = new Path(Utils.resolveURI(jar)).getParent.toUri
+        directoryCounter.update(parentUri, directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.filter(_._2 >= statCachePreloadDirectoryCountThreshold).keys.toList
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fsLookup: Function for looking up an FS based on a URI; override for testing
+   * @return A preloaded statCache with fileStatus
+   */
+  private[yarn] def getPreloadedStatCache(
+      fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
+    val statCache = HashMap[URI, FileStatus]()
+    val jars = sparkConf.get(SPARK_JARS)
+    val directories = jars.map(directoriesToBePreloaded).getOrElse(ArrayBuffer.empty[URI])
+
+    directories.foreach { dir =>
+      fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile())
+        .foreach { fileStatus =>
+          val uri = fileStatus.getPath.toUri
+          if (jars.exists(_.contains(uri.toString))) {
+            logDebug(s"Add ${uri} file status to statCache.")
+            statCache.put(uri, fileStatus)
+          }
+        }
+    }

Review Comment:
   Perhaps my comment was not very clear.
   You can test what I am referring to by checking for something with unresolved path in spark.jar - for example, something like `hdfs://host:123/a/b/c/../file`
   
   The `jars.exists(_.contains(uri.toString))` check will return false - and so not cache this as `Path` always resolves the URI - and so will return `/a/b/file` for Path here.
   This pattern is quite common when users are specifying jars (use of `..`, './`, etc)



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,52 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): List[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val parentUri = new Path(Utils.resolveURI(jar)).getParent.toUri
+        directoryCounter.update(parentUri, directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.filter(_._2 >= statCachePreloadDirectoryCountThreshold).keys.toList
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fsLookup: Function for looking up an FS based on a URI; override for testing
+   * @return A preloaded statCache with fileStatus
+   */
+  private[yarn] def getPreloadedStatCache(
+      fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
+    val statCache = HashMap[URI, FileStatus]()
+    val jars = sparkConf.get(SPARK_JARS)
+    val directories = jars.map(directoriesToBePreloaded).getOrElse(ArrayBuffer.empty[URI])
+
+    directories.foreach { dir =>
+      fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile())
+        .foreach { fileStatus =>
+          val uri = fileStatus.getPath.toUri
+          if (jars.exists(_.contains(uri.toString))) {
+            logDebug(s"Add ${uri} file status to statCache.")
+            statCache.put(uri, fileStatus)
+          }
+        }
+    }

Review Comment:
   Perhaps my comment was not very clear.
   You can test what I am referring to by checking for something with unresolved path in spark.jar - for example, something like `hdfs://host:123/a/b/c/../file`
   
   The `jars.exists(_.contains(uri.toString))` check will return false - and so not cache this as `Path` always resolves the URI - and so will return `/a/b/file` for Path here.
   This pattern is quite common when users are specifying jars (use of `..`, `./`, etc)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shuwang21 commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "shuwang21 (via GitHub)" <gi...@apache.org>.
shuwang21 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1329667850


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -533,9 +536,12 @@ private[spark] class Client(
     // If preload is enabled, preload the statCache with the files in the directories
     val statCache = if (statCachePreloadEnabled) {
       // Consider only following configurations, as they involve the distribution of multiple files
-      val files = sparkConf.get(SPARK_JARS).orNull ++ sparkConf.get(JARS_TO_DISTRIBUTE) ++
-        sparkConf.get(FILES_TO_DISTRIBUTE) ++ sparkConf.get(ARCHIVES_TO_DISTRIBUTE) ++
-        sparkConf.get(PY_FILES) ++ pySparkArchives
+      var files = sparkConf.get(JARS_TO_DISTRIBUTE) ++ sparkConf.get(FILES_TO_DISTRIBUTE) ++
+        sparkConf.get(ARCHIVES_TO_DISTRIBUTE) ++ sparkConf.get(PY_FILES) ++ pySparkArchives
+      if (!sparkConf.get(SPARK_JARS).isEmpty) {

Review Comment:
   I was thinking the suggestion from here. So, it seems `if` is not needed. Updated.  
   https://github.com/apache/spark/pull/42357#discussion_r1320755309
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #42357:
URL: https://github.com/apache/spark/pull/42357#issuecomment-1726224793

   Merged to master.
   Thanks for fixing this @shuwang21 !
   Thanks for the reviews @xkrogen, @venkata91 and @ShreyeshArangath :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "xkrogen (via GitHub)" <gi...@apache.org>.
xkrogen commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1291564583


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -471,38 +470,40 @@ private[spark] class Client(
    * @param jars : the list of jars to upload
    * @return a list of directories to be preloaded
    * */
-  def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): List[URI] = {
     val directoryCounter = new HashMap[URI, Int]()
     jars.foreach { jar =>
       if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
-        val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
-        val parentUri = path.getParent.toUri
+        val parentUri = new Path (Utils.resolveURI(jar)).getParent.toUri
         directoryCounter.update(parentUri, directoryCounter.getOrElse(parentUri, 0) + 1)
       }
     }
-    directoryCounter.filter(_._2 >= perDirectoryThreshold).keys
+    directoryCounter.filter(_._2 >= statCachePreloadDirectoryCountThreshold).keys.toList
   }
 
   /**
    * Preload the statCache with file status. For each directory in the list, we will list all
    * files from that directory and add them to the statCache.
-   *
-   * @param fs : the target file system
-   * @param statCache : stat cache to be preloaded with fileStatus
+   * @param testFS : the file system to be used for testing only (optional)
+   * @return A preloaded statCache with fileStatus
    */
-  def statCachePreload(fs: FileSystem, statCache: Map[URI, FileStatus]): Unit = {
-    logDebug("Preload the following directories")
-    val directory = sparkConf.get(SPARK_JARS)
-      .map(directoriesToBePreloaded)
-      .getOrElse(ArrayBuffer.empty[URI])
-
-    directory.foreach { dir =>
-      fs.listStatus(new Path(dir)).filter(_.isFile()).foreach { fileStatus =>
-        val uri = fileStatus.getPath.toUri
-        logDebug(s"add ${uri} plan to added to stat cache.")
-        statCache.put(uri, fileStatus)
-      }
+  private[yarn] def getPreloadedStatCache(testFS: Option[FileSystem]):
+  HashMap[URI, FileStatus] = {
+    val statCache = HashMap[URI, FileStatus]()
+    val jars = sparkConf.get(SPARK_JARS)
+    val directories = jars.map(directoriesToBePreloaded).getOrElse(ArrayBuffer.empty[URI])
+
+    directories.foreach { dir =>
+      testFS.getOrElse(FileSystem.get(dir, hadoopConf)).listStatus(new Path(dir)).filter(_.isFile())

Review Comment:
   The `testFS` feels a little awkward to me. I didn't see any super clean way to do it without some major refactoring to `yarn.Client`, but how about this?
   ```suggestion
      * @param fsLookup: Function for looking up an FS based on a URI; override for testing
      * @return A preloaded statCache with fileStatus
      */
     private[yarn] def getPreloadedStatCache(
         fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
       val statCache = HashMap[URI, FileStatus]()
       val jars = sparkConf.get(SPARK_JARS)
       val directories = jars.map(directoriesToBePreloaded).getOrElse(ArrayBuffer.empty[URI])
   
       directories.foreach { dir =>
         fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile())
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shuwang21 commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "shuwang21 (via GitHub)" <gi...@apache.org>.
shuwang21 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1292892702


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -485,7 +534,12 @@ private[spark] class Client(
     val localResources = HashMap[String, LocalResource]()
     FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION))
 
-    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+    // If preload is enabled, preload the statCache with the files in the directories
+    val statCache = if (statCachePreloadEnabled) {
+      getPreloadedStatCache()
+    } else {
+      HashMap[URI, FileStatus]()

Review Comment:
   Based on the original code `val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()`
   Maybe just keep the current one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shuwang21 commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "shuwang21 (via GitHub)" <gi...@apache.org>.
shuwang21 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1292888633


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client side. This feature" +

Review Comment:
   Thanks for feedback.  I will keep the current version. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1318069261


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -492,10 +492,8 @@ private[spark] class Client(
   private[yarn] def getPreloadedStatCache(
       fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
     val statCache = HashMap[URI, FileStatus]()
-    val jars = sparkConf.get(SPARK_JARS)
-    val directoryToFiles = jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
-
-    directoryToFiles.foreach { case (dir: URI, filesInDir: HashSet[String]) =>
+    val jars = sparkConf.get(JARS_TO_DISTRIBUTE)

Review Comment:
   (Edited comment):
   
   I am not sure I understood the comment :-)
   `statCache` is used within `prepareLocalResources` - within that method, it is used in context of `SPARK_JARS` and `distribute` method - so why use it only for `JARS_TO_DISTRIBUTE` ?
   
   Or in other words, if it applies to all seq's - why only `JARS_TO_DISTRIBUTE` and not to all relevant configs ? (`SPARK_JARS`, `JARS_TO_DISTRIBUTE`, `FILES_TO_DISTRIBUTE`, `ARCHIVES_TO_DISTRIBUTE`, `pySparkArchives`, `PY_FILES`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shuwang21 commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "shuwang21 (via GitHub)" <gi...@apache.org>.
shuwang21 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1291039324


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+        val parentUri = path.getParent.toUri
+        directoryCounter.update(parentUri, directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.collect {
+      case (uri, count) if count >= perDirectoryThreshold => uri
+    }
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fs : the target file system
+   * @param statCache : stat cache to be preloaded with fileStatus
+   */
+  def statCachePreload(fs: FileSystem, statCache: Map[URI, FileStatus]): Unit = {
+    logDebug("Preload the following directories")
+    val directory = sparkConf.get(SPARK_JARS)
+      .map(directoriesToBePreloaded)
+      .getOrElse(ArrayBuffer.empty[URI])
+
+    directory.foreach { dir =>
+      fs.listStatus(new Path(dir)).filter(_.isFile()).foreach { fileStatus =>

Review Comment:
   I see. Thanks for catching this. Please take another look. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shuwang21 commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "shuwang21 (via GitHub)" <gi...@apache.org>.
shuwang21 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1292885986


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,52 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): List[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val parentUri = new Path(Utils.resolveURI(jar)).getParent.toUri
+        directoryCounter.update(parentUri, directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.filter(_._2 >= statCachePreloadDirectoryCountThreshold).keys.toList
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fsLookup: Function for looking up an FS based on a URI; override for testing
+   * @return A preloaded statCache with fileStatus
+   */
+  private[yarn] def getPreloadedStatCache(
+      fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
+    val statCache = HashMap[URI, FileStatus]()
+    val jars = sparkConf.get(SPARK_JARS)
+    val directories = jars.map(directoriesToBePreloaded).getOrElse(ArrayBuffer.empty[URI])
+
+    directories.foreach { dir =>
+      fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile())
+        .foreach { fileStatus =>
+          val uri = fileStatus.getPath.toUri
+          if (jars.exists(_.contains(uri.toString))) {
+            logDebug(s"Add ${uri} file status to statCache.")
+            statCache.put(uri, fileStatus)
+          }
+        }
+    }

Review Comment:
   @mridulm I think this is fine and intended. If `directoriesToBePreloaded` only check each non-local and non-glob resource. For unresolved paths or glob paths, we will not do perload. The existing code `pathFs.globStatus(path)` (L669, Client.scala)will handle this later on. This will just be one PRC call, no need to optimize in this case.
   
   > `jars` contains unresolved paths, while `dir` has been resolved, and this can lead to `jars.exists(_.contains(uri.toString))` returning `false`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shuwang21 commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "shuwang21 (via GitHub)" <gi...@apache.org>.
shuwang21 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1292889034


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client side. This feature" +
+      " analyzes the pattern of resources paths. If multiple resources shared the same parent" +
+      " directory, a single <code>listStatus</code> will be invoked on the parent directory" +
+      " instead of multiple <code>getFileStatus</code> performed on each individual resources." +
+      " If most resources from a small set of directories, this can substantially improve job" +
+      " submission time. Noticing, this could potentially increase the memory overhead at client" +
+      " side.")
+    .version("4.0.0")
+    .booleanConf
+    .createWithDefault(false)
+
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD =
+    ConfigBuilder("spark.yarn.client.statCache.preload.perDirectoryThreshold")
+      .doc("This configuration defines the threshold for the number of resources in a directory" +

Review Comment:
   Thanks for feedback. Both looks good to me, therefore, i will keep the current one. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1320755309


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -485,7 +530,16 @@ private[spark] class Client(
     val localResources = HashMap[String, LocalResource]()
     FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION))
 
-    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+    // If preload is enabled, preload the statCache with the files in the directories
+    val statCache = if (statCachePreloadEnabled) {
+      // Consider only following configurations, as they involve the distribution of multiple files
+      val files = sparkConf.get(SPARK_JARS).orNull ++ sparkConf.get(JARS_TO_DISTRIBUTE) ++

Review Comment:
   `sparkConf.get(SPARK_JARS).orNull` -> `sparkConf.get(SPARK_JARS).getOrElse(Nil)`
   Can you add a condition to the test where `SPARK_JARS` is empty to make sure this is caught ?
   
   



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,30 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client side. This feature " +
+      "analyzes the pattern of resources paths, and if multiple resources shared the same parent " +
+      "directory, a single <code>listStatus</code> will be invoked on the parent directory " +
+      "instead of multiple <code>getFileStatus</code> performed on each individual resources. " +
+      "If most resources from a small set of directories, this can substantially improve job " +

Review Comment:
   ```suggestion
         "If most are resources from a small set of directories, this can improve job " +
   ```



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,30 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client side. This feature " +

Review Comment:
   ```suggestion
       .doc("Enables statCache to be preloaded at YARN client side. This feature " +
   ```



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,48 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, the corresponding file status
+   * from the directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a hashmap contains directories to be preloaded and all file names in that directory
+   */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): HashMap[URI, HashSet[String]] = {

Review Comment:
   `jars` -> `files`



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,30 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client side. This feature " +
+      "analyzes the pattern of resources paths, and if multiple resources shared the same parent " +
+      "directory, a single <code>listStatus</code> will be invoked on the parent directory " +
+      "instead of multiple <code>getFileStatus</code> performed on each individual resources. " +

Review Comment:
   nit:
   ```suggestion
         "instead of multiple <code>getFileStatus</code> on individual resources. " +
   ```



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,30 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client side. This feature " +
+      "analyzes the pattern of resources paths, and if multiple resources shared the same parent " +
+      "directory, a single <code>listStatus</code> will be invoked on the parent directory " +
+      "instead of multiple <code>getFileStatus</code> performed on each individual resources. " +
+      "If most resources from a small set of directories, this can substantially improve job " +
+      "submission time. Enabling this feature may potentially increase client memory overhead.")
+    .version("4.0.0")
+    .booleanConf
+    .createWithDefault(false)
+
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD =
+    ConfigBuilder("spark.yarn.client.statCache.preload.perDirectoryThreshold")
+      .doc("This configuration defines the threshold for the number of resources in a directory " +
+        "that triggers the activation of the statCache preloading. When the count of individual " +
+        "resources specified by <code>spark.yarn.jars</code> within a directory is no less than " +
+        "this threshold, the statCache preloading for that directory will be activated. It's " +
+        "important to note that this configuration will only take effect when the " +
+        "<code>spark.yarn.client.statCache.preloaded.enabled</code> option is enabled.")

Review Comment:
   ```suggestion
         .doc("Minimum resource count in a directory to trigger statCache preloading when " +
           "submitting an application. If the number of resources in a directory, without " +
           "any wildcards, equals or exceeds this threshold, the statCache for that directory " +
           "will be preloaded. This configuration will only take effect when " +
           "<code>spark.yarn.client.statCache.preloaded.enabled</code> option is enabled.")
   ```



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,30 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client side. This feature " +
+      "analyzes the pattern of resources paths, and if multiple resources shared the same parent " +

Review Comment:
   ```suggestion
         "analyzes the pattern of resources paths, and if multiple resources share the same parent " +
   ```



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,30 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client side. This feature " +
+      "analyzes the pattern of resources paths, and if multiple resources shared the same parent " +
+      "directory, a single <code>listStatus</code> will be invoked on the parent directory " +
+      "instead of multiple <code>getFileStatus</code> performed on each individual resources. " +
+      "If most resources from a small set of directories, this can substantially improve job " +
+      "submission time. Enabling this feature may potentially increase client memory overhead.")

Review Comment:
   How substantial is the impact on memory overhead ?
   I am assuming this is applicable when an extremely large number of files are present in the parent directory ?
   If yes, did you expose using alternatives `listStatus` which might help ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shuwang21 commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "shuwang21 (via GitHub)" <gi...@apache.org>.
shuwang21 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1328148388


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,30 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client side. This feature " +
+      "analyzes the pattern of resources paths, and if multiple resources shared the same parent " +
+      "directory, a single <code>listStatus</code> will be invoked on the parent directory " +
+      "instead of multiple <code>getFileStatus</code> performed on each individual resources. " +
+      "If most resources from a small set of directories, this can substantially improve job " +
+      "submission time. Enabling this feature may potentially increase client memory overhead.")

Review Comment:
   Optimized with `PathFilter`. Validate internally. Now HDFS will only return needed fileStatus instead of all fileStatus from directory. This will limit the memory consumption.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1294151353


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,52 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): List[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val parentUri = new Path(Utils.resolveURI(jar)).getParent.toUri
+        directoryCounter.update(parentUri, directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.filter(_._2 >= statCachePreloadDirectoryCountThreshold).keys.toList
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fsLookup: Function for looking up an FS based on a URI; override for testing
+   * @return A preloaded statCache with fileStatus
+   */
+  private[yarn] def getPreloadedStatCache(
+      fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
+    val statCache = HashMap[URI, FileStatus]()
+    val jars = sparkConf.get(SPARK_JARS)
+    val directories = jars.map(directoriesToBePreloaded).getOrElse(ArrayBuffer.empty[URI])
+
+    directories.foreach { dir =>
+      fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile())
+        .foreach { fileStatus =>
+          val uri = fileStatus.getPath.toUri
+          if (jars.exists(_.contains(uri.toString))) {
+            logDebug(s"Add ${uri} file status to statCache.")
+            statCache.put(uri, fileStatus)
+          }
+        }
+    }

Review Comment:
   Perhaps my comment was not very clear.
   You can test what I am referring to by checking for something with unresolved path in spark.jar - for example, something like `hdfs://host:123/a/b/c/../file`
   
   The `jars.exists(_.contains(uri.toString))` check will return false - and so not cache this as `Path` always resolves the URI - and so will return `/a/b/file` for Path here.
   This pattern is quite common when users are specifying jars (use of `..`, `./`, etc in the path)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shuwang21 commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "shuwang21 (via GitHub)" <gi...@apache.org>.
shuwang21 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1291038823


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+        val parentUri = path.getParent.toUri
+        directoryCounter.update(parentUri, directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.collect {
+      case (uri, count) if count >= perDirectoryThreshold => uri
+    }
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fs : the target file system
+   * @param statCache : stat cache to be preloaded with fileStatus
+   */
+  def statCachePreload(fs: FileSystem, statCache: Map[URI, FileStatus]): Unit = {
+    logDebug("Preload the following directories")
+    val directory = sparkConf.get(SPARK_JARS)
+      .map(directoriesToBePreloaded)
+      .getOrElse(ArrayBuffer.empty[URI])
+
+    directory.foreach { dir =>
+      fs.listStatus(new Path(dir)).filter(_.isFile()).foreach { fileStatus =>

Review Comment:
   Nice catch. Revised code and UT accordingly. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shuwang21 commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "shuwang21 (via GitHub)" <gi...@apache.org>.
shuwang21 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1292893433


##########
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala:
##########
@@ -666,6 +666,42 @@ class ClientSuite extends SparkFunSuite with Matchers {
     assertUserClasspathUrls(cluster = true, replacementRootPath)
   }
 
+  test("SPARK-44306: test directoriesToBePreloaded") {
+    val sparkConf = new SparkConf()
+      .set(YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD, 3)
+    val client = createClient(sparkConf, args = Array("--jar", USER))
+    val directories = client.directoriesToBePreloaded(Seq(
+      "hdfs:/valid/a.jar",
+      "hdfs:/valid/b.jar",
+      "hdfs:/valid/c.jar",
+      "s3:/valid/a.jar",
+      "s3:/valid/b.jar",
+      "s3:/valid/c.jar",
+      "hdfs:/glob/*",
+      "hdfs:/fewer/a.jar",
+      "hdfs:/fewer/b.jar",
+      "local:/local/a.jar",
+      "local:/local/b.jar",
+      "local:/local/c.jar"))
+    assert(directories.size == 2 && directories.contains(new URI("hdfs:/valid"))
+      && directories.contains(new URI("s3:/valid")))

Review Comment:
   Maybe not needed? We checked `directories.size == 2`, and `directories` contains and only contains `hdfs:/valid` and  `s3:/valid`. If so, `local:/local` cannot be founded in `directories`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1318064457


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -492,10 +492,8 @@ private[spark] class Client(
   private[yarn] def getPreloadedStatCache(
       fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
     val statCache = HashMap[URI, FileStatus]()
-    val jars = sparkConf.get(SPARK_JARS)
-    val directoryToFiles = jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
-
-    directoryToFiles.foreach { case (dir: URI, filesInDir: HashSet[String]) =>
+    val jars = sparkConf.get(JARS_TO_DISTRIBUTE)

Review Comment:
   This does not look right - why are we changing the config used ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1318073365


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, the corresponding file status
+   * from the directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a hashmap contains directories to be preloaded and all file names in that directory
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): HashMap[URI, Set[String]] = {
+    val directoryToFiles = new HashMap[URI, Set[String]]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val currentUri = Utils.resolveURI(jar)
+        val parentUri = new Path(currentUri).getParent.toUri
+        directoryToFiles.update(parentUri, directoryToFiles.getOrElse(parentUri, Set.empty)
+          .union(Set(currentUri.normalize().toString)))
+      }
+    }
+    directoryToFiles.filter(_._2.size >= statCachePreloadDirectoryCountThreshold)
+  }
+
+  /**
+   * Preload the statCache with file status. List all files from that directory and add them to the
+   * statCache.
+   *
+   * @param fsLookup: Function for looking up an FS based on a URI; override for testing
+   * @return A preloaded statCache with fileStatus
+   */
+  private[yarn] def getPreloadedStatCache(
+      fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
+    val statCache = HashMap[URI, FileStatus]()
+    val jars = sparkConf.get(SPARK_JARS)
+    val directoryToFiles = jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
+
+    directoryToFiles.foreach { case (dir: URI, filesInDir: Set[String]) =>
+      fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile()).
+        filter(f => filesInDir.contains(f.getPath.toString)).foreach { fileStatus =>
+          val uri = fileStatus.getPath.toUri
+          statCache.put(uri, fileStatus)

Review Comment:
   Agree, this will get handled below as well - was trying to see if we can short circuit it here, since we do know the parent will be required to be queried.
   It is fine to defer it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shuwang21 commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "shuwang21 (via GitHub)" <gi...@apache.org>.
shuwang21 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1295728781


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,30 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client side. This feature" +
+      " analyzes the pattern of resources paths. If multiple resources shared the same parent" +
+      " directory, a single <code>listStatus</code> will be invoked on the parent directory" +
+      " instead of multiple <code>getFileStatus</code> performed on each individual resources." +
+      " If most resources from a small set of directories, this can substantially improve job" +
+      " submission time. Enabling this feature may potentially increase client memory overhead.")
+    .version("4.0.0")
+    .booleanConf
+    .createWithDefault(false)
+
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD =
+    ConfigBuilder("spark.yarn.client.statCache.preload.perDirectoryThreshold")
+      .doc("This configuration defines the threshold for the number of resources in a directory" +
+        " that triggers the activation of the statCache preloading. When the count of individual" +

Review Comment:
   Yeah. I am working on some fixs. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm closed pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm closed pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client
URL: https://github.com/apache/spark/pull/42357


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1295720900


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,30 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client side. This feature" +
+      " analyzes the pattern of resources paths. If multiple resources shared the same parent" +
+      " directory, a single <code>listStatus</code> will be invoked on the parent directory" +
+      " instead of multiple <code>getFileStatus</code> performed on each individual resources." +
+      " If most resources from a small set of directories, this can substantially improve job" +
+      " submission time. Enabling this feature may potentially increase client memory overhead.")
+    .version("4.0.0")
+    .booleanConf
+    .createWithDefault(false)
+
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD =
+    ConfigBuilder("spark.yarn.client.statCache.preload.perDirectoryThreshold")
+      .doc("This configuration defines the threshold for the number of resources in a directory" +
+        " that triggers the activation of the statCache preloading. When the count of individual" +

Review Comment:
   Please do not apply this verbatim :-)
   This was illustrative of how to fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1294151353


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,52 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): List[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val parentUri = new Path(Utils.resolveURI(jar)).getParent.toUri
+        directoryCounter.update(parentUri, directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.filter(_._2 >= statCachePreloadDirectoryCountThreshold).keys.toList
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fsLookup: Function for looking up an FS based on a URI; override for testing
+   * @return A preloaded statCache with fileStatus
+   */
+  private[yarn] def getPreloadedStatCache(
+      fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
+    val statCache = HashMap[URI, FileStatus]()
+    val jars = sparkConf.get(SPARK_JARS)
+    val directories = jars.map(directoriesToBePreloaded).getOrElse(ArrayBuffer.empty[URI])
+
+    directories.foreach { dir =>
+      fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile())
+        .foreach { fileStatus =>
+          val uri = fileStatus.getPath.toUri
+          if (jars.exists(_.contains(uri.toString))) {
+            logDebug(s"Add ${uri} file status to statCache.")
+            statCache.put(uri, fileStatus)
+          }
+        }
+    }

Review Comment:
   Perhaps my comment was not very clear.
   You can test what I am referring to by checking for something with unresolved path in spark.jar - for example, something like `hdfs://host:123/a/b/c/../file`
   
   The `jars.exists(_.contains(uri.toString))` check will return false - and so not cache this.
   This pattern is quite common when users are specifying jars (use of `..`, './`, etc)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shuwang21 commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "shuwang21 (via GitHub)" <gi...@apache.org>.
shuwang21 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1291926490


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -471,38 +470,40 @@ private[spark] class Client(
    * @param jars : the list of jars to upload
    * @return a list of directories to be preloaded
    * */
-  def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): List[URI] = {
     val directoryCounter = new HashMap[URI, Int]()
     jars.foreach { jar =>
       if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
-        val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
-        val parentUri = path.getParent.toUri
+        val parentUri = new Path (Utils.resolveURI(jar)).getParent.toUri
         directoryCounter.update(parentUri, directoryCounter.getOrElse(parentUri, 0) + 1)
       }
     }
-    directoryCounter.filter(_._2 >= perDirectoryThreshold).keys
+    directoryCounter.filter(_._2 >= statCachePreloadDirectoryCountThreshold).keys.toList
   }
 
   /**
    * Preload the statCache with file status. For each directory in the list, we will list all
    * files from that directory and add them to the statCache.
-   *
-   * @param fs : the target file system
-   * @param statCache : stat cache to be preloaded with fileStatus
+   * @param testFS : the file system to be used for testing only (optional)
+   * @return A preloaded statCache with fileStatus
    */
-  def statCachePreload(fs: FileSystem, statCache: Map[URI, FileStatus]): Unit = {
-    logDebug("Preload the following directories")
-    val directory = sparkConf.get(SPARK_JARS)
-      .map(directoriesToBePreloaded)
-      .getOrElse(ArrayBuffer.empty[URI])
-
-    directory.foreach { dir =>
-      fs.listStatus(new Path(dir)).filter(_.isFile()).foreach { fileStatus =>
-        val uri = fileStatus.getPath.toUri
-        logDebug(s"add ${uri} plan to added to stat cache.")
-        statCache.put(uri, fileStatus)
-      }
+  private[yarn] def getPreloadedStatCache(testFS: Option[FileSystem]):
+  HashMap[URI, FileStatus] = {
+    val statCache = HashMap[URI, FileStatus]()
+    val jars = sparkConf.get(SPARK_JARS)
+    val directories = jars.map(directoriesToBePreloaded).getOrElse(ArrayBuffer.empty[URI])
+
+    directories.foreach { dir =>
+      testFS.getOrElse(FileSystem.get(dir, hadoopConf)).listStatus(new Path(dir)).filter(_.isFile())

Review Comment:
   Yeah. Thanks for the suggestion. Much better than `testFS` one. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1318070853


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -492,10 +492,8 @@ private[spark] class Client(
   private[yarn] def getPreloadedStatCache(
       fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
     val statCache = HashMap[URI, FileStatus]()
-    val jars = sparkConf.get(SPARK_JARS)
-    val directoryToFiles = jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
-
-    directoryToFiles.foreach { case (dir: URI, filesInDir: HashSet[String]) =>
+    val jars = sparkConf.get(JARS_TO_DISTRIBUTE)

Review Comment:
   Note - it is fine to ignore one-off cases, like app-jar, ivy, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1318069261


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -492,10 +492,8 @@ private[spark] class Client(
   private[yarn] def getPreloadedStatCache(
       fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
     val statCache = HashMap[URI, FileStatus]()
-    val jars = sparkConf.get(SPARK_JARS)
-    val directoryToFiles = jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
-
-    directoryToFiles.foreach { case (dir: URI, filesInDir: HashSet[String]) =>
+    val jars = sparkConf.get(JARS_TO_DISTRIBUTE)

Review Comment:
   (Editing comment:)
   
   I am not sure I understood the comment :-)
   `statCache` is used within `prepareLocalResources` - within that method, it is used in context of `SPARK_JARS` and `distribute` method - so why use it only for `JARS_TO_DISTRIBUTE` ?
   
   Or in other words, if it applies to both - why only `JARS_TO_DISTRIBUTE` and not to all relevant configs ? (`SPARK_JARS`, `JARS_TO_DISTRIBUTE`, `FILES_TO_DISTRIBUTE`, `ARCHIVES_TO_DISTRIBUTE`, `pySparkArchives`, `PY_FILES`)



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -492,10 +492,8 @@ private[spark] class Client(
   private[yarn] def getPreloadedStatCache(
       fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
     val statCache = HashMap[URI, FileStatus]()
-    val jars = sparkConf.get(SPARK_JARS)
-    val directoryToFiles = jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
-
-    directoryToFiles.foreach { case (dir: URI, filesInDir: HashSet[String]) =>
+    val jars = sparkConf.get(JARS_TO_DISTRIBUTE)

Review Comment:
   (Editing comment:)
   
   I am not sure I understood the comment :-)
   `statCache` is used within `prepareLocalResources` - within that method, it is used in context of `SPARK_JARS` and `distribute` method - so why use it only for `JARS_TO_DISTRIBUTE` ?
   
   Or in other words, if it applies to all seq's - why only `JARS_TO_DISTRIBUTE` and not to all relevant configs ? (`SPARK_JARS`, `JARS_TO_DISTRIBUTE`, `FILES_TO_DISTRIBUTE`, `ARCHIVES_TO_DISTRIBUTE`, `pySparkArchives`, `PY_FILES`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "xkrogen (via GitHub)" <gi...@apache.org>.
xkrogen commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1290346200


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+        val parentUri = path.getParent.toUri
+        directoryCounter.update(parentUri, directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.collect {
+      case (uri, count) if count >= perDirectoryThreshold => uri
+    }
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fs : the target file system
+   * @param statCache : stat cache to be preloaded with fileStatus
+   */
+  def statCachePreload(fs: FileSystem, statCache: Map[URI, FileStatus]): Unit = {
+    logDebug("Preload the following directories")
+    val directory = sparkConf.get(SPARK_JARS)
+      .map(directoriesToBePreloaded)
+      .getOrElse(ArrayBuffer.empty[URI])
+
+    directory.foreach { dir =>
+      fs.listStatus(new Path(dir)).filter(_.isFile()).foreach { fileStatus =>

Review Comment:
   shouldn't we filter to only files in `sparkConf.get(SPARK_JARS)`? Envision we have directory `/foo` with three files `a` `b` `c`, and `spark.jars = a,b`. With the current code, won't we now also load `c`?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+        val parentUri = path.getParent.toUri
+        directoryCounter.update(parentUri, directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.collect {
+      case (uri, count) if count >= perDirectoryThreshold => uri
+    }

Review Comment:
   minor nit: you can make this a little more concise like:
   ```suggestion
       directoryCounter.filter(_._2 >= perDirectoryThreshold).keys
   ```
   (feel free to ignore if you feel readability is negatively impacted)



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+        val parentUri = path.getParent.toUri
+        directoryCounter.update(parentUri, directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.collect {
+      case (uri, count) if count >= perDirectoryThreshold => uri
+    }
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fs : the target file system
+   * @param statCache : stat cache to be preloaded with fileStatus
+   */
+  def statCachePreload(fs: FileSystem, statCache: Map[URI, FileStatus]): Unit = {
+    logDebug("Preload the following directories")

Review Comment:
   Did you mean to log some directory names here?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+        val parentUri = path.getParent.toUri
+        directoryCounter.update(parentUri, directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.collect {
+      case (uri, count) if count >= perDirectoryThreshold => uri
+    }
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fs : the target file system
+   * @param statCache : stat cache to be preloaded with fileStatus
+   */
+  def statCachePreload(fs: FileSystem, statCache: Map[URI, FileStatus]): Unit = {
+    logDebug("Preload the following directories")
+    val directory = sparkConf.get(SPARK_JARS)

Review Comment:
   `directories`? this is a list, not a single dir, right?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+        val parentUri = path.getParent.toUri
+        directoryCounter.update(parentUri, directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.collect {
+      case (uri, count) if count >= perDirectoryThreshold => uri
+    }
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fs : the target file system
+   * @param statCache : stat cache to be preloaded with fileStatus
+   */
+  def statCachePreload(fs: FileSystem, statCache: Map[URI, FileStatus]): Unit = {
+    logDebug("Preload the following directories")
+    val directory = sparkConf.get(SPARK_JARS)
+      .map(directoriesToBePreloaded)
+      .getOrElse(ArrayBuffer.empty[URI])
+
+    directory.foreach { dir =>
+      fs.listStatus(new Path(dir)).filter(_.isFile()).foreach { fileStatus =>

Review Comment:
   You can't assume that it's the same FS for all of the directories. For example we could have:
   ```
   spark.jars = hdfs:///foo/a.jar,hdfs:///foo/b.jar,s3:///bar/c.jar,s3:///bar/d.jar
   ```
   Now our preload directories would be `hdfs:///foo` and `s3:///bar` -- two different filesystems. You have to load the correct one based on the URI scheme
   
   `fs` is the _destination_ FS for uploads, which will always be the same, but here we are loading from the _source_ FS, which can be different for each JAR/dir.



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)

Review Comment:
   Why `getQualifiedLocalPath()`? In this block we already know `!isLocalUri()`, so it's not local. Can't we just do `val parentUri = new Path(Utils.resolveURI(jar)).getParent.toUri` ?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+        val parentUri = path.getParent.toUri
+        directoryCounter.update(parentUri, directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.collect {
+      case (uri, count) if count >= perDirectoryThreshold => uri
+    }
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fs : the target file system
+   * @param statCache : stat cache to be preloaded with fileStatus
+   */
+  def statCachePreload(fs: FileSystem, statCache: Map[URI, FileStatus]): Unit = {

Review Comment:
   `private[yarn]` ?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOADED_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preloaded.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client side. This feature " +
+      "analyzes the pattern of resources paths. If multiple resources shared the same parent " +
+      "directory, a single <code>listStatus</code> will be invoked on the parent directory " +
+      "instead of multiple <code>getFileStatus</code> performed on each individual resources. " +
+      "If most resources are from the same directory, this feature greatly reduces the RPC call, " +
+      "reduce job runtime, and improve the job runtime variation due to network delays. " +
+      "Noticing, this could potentially increase the memory overhead at client side.")
+    .version("3.5.0")
+    .booleanConf
+    .createWithDefault(false)
+
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOADED_PER_DIRECTORY_THRESHOLD =
+    ConfigBuilder("spark.yarn.client.statCache.preloaded.perDirectoryThreshold")
+      .doc("This configuration defines the threshold for the number of resources in a directory" +
+        " that triggers the activation of the statCache preloading. When the count of individual" +
+        " resources specified by <code>spark.yarn.jars</code> within a directory is no less than" +
+        " this threshold, the statCache preloading for that directory will be activated. It's" +
+        " important to note that this configuration will only take effect when the" +
+        " <code>spark.yarn.client.statCache.preloaded.enabled</code> option is enabled.")
+      .version("3.5.0")
+      .longConf
+      .createWithDefault(0L)

Review Comment:
   Let's set a sensible default here? 0 means that even if every JAR is in a different directory, we'll load each one using `listStatus`, which probably isn't what we want. I would say something like 5 or 10 is probably reasonable as a starting point?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOADED_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preloaded.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client side. This feature " +
+      "analyzes the pattern of resources paths. If multiple resources shared the same parent " +
+      "directory, a single <code>listStatus</code> will be invoked on the parent directory " +
+      "instead of multiple <code>getFileStatus</code> performed on each individual resources. " +
+      "If most resources are from the same directory, this feature greatly reduces the RPC call, " +
+      "reduce job runtime, and improve the job runtime variation due to network delays. " +

Review Comment:
   We won't actually decrease the runtime of the job itself, just of the submission/initialization of the job. Generally I think we should be careful about making too strong of a claim. How about "If most resources from a small set of directories, this can substantially improve job submission time."



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -77,6 +78,9 @@ private[spark] class Client(
   private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster"
 
   private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && !isClusterMode
+  private val statCachePreloadedEnabled = sparkConf.get(YARN_CLIENT_STAT_CACHE_PRELOADED_ENABLED)
+  private val perDirectoryThreshold =

Review Comment:
   `statCachePreloadDirectoryCountThreshold` or something similar? I think we need ` statCachePreload` prefix of some sort



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOADED_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preloaded.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client side. This feature " +
+      "analyzes the pattern of resources paths. If multiple resources shared the same parent " +
+      "directory, a single <code>listStatus</code> will be invoked on the parent directory " +
+      "instead of multiple <code>getFileStatus</code> performed on each individual resources. " +
+      "If most resources are from the same directory, this feature greatly reduces the RPC call, " +
+      "reduce job runtime, and improve the job runtime variation due to network delays. " +
+      "Noticing, this could potentially increase the memory overhead at client side.")
+    .version("3.5.0")
+    .booleanConf
+    .createWithDefault(false)
+
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOADED_PER_DIRECTORY_THRESHOLD =
+    ConfigBuilder("spark.yarn.client.statCache.preloaded.perDirectoryThreshold")
+      .doc("This configuration defines the threshold for the number of resources in a directory" +
+        " that triggers the activation of the statCache preloading. When the count of individual" +
+        " resources specified by <code>spark.yarn.jars</code> within a directory is no less than" +
+        " this threshold, the statCache preloading for that directory will be activated. It's" +
+        " important to note that this configuration will only take effect when the" +
+        " <code>spark.yarn.client.statCache.preloaded.enabled</code> option is enabled.")
+      .version("3.5.0")
+      .longConf

Review Comment:
   int is probably sufficient? long would overflow our in-memory buffers anyway :)



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {

Review Comment:
   Maybe return a `Seq` or `List`? Even your Scaladoc says you're returning a list :) `Iterable` has some caveats like potentially only being iterable a single time, I generally wouldn't recommend passing it around unless you have a need to (e.g. you're generating elements on the fly and don't want to materialize the whole thing)
   
   also, this should be `private[yarn]` ?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -583,6 +632,9 @@ private[spark] class Client(
       case _ => None
     }
 
+    // If preload is enabled, preload the statCache with the files in the directories
+    if (statCachePreloadedEnabled) { statCachePreload(fs, statCache) }

Review Comment:
   The control flow is a little confusing to me, where we initialize `statCache` up on L537, then access it while processing the Ivy settings and AM keytab file, _then_ populate it down here. How about we preload it when we initialize?
   ```scala
   val statCache = if (statCachePreloadEnabled) {
     getPreloadedStatCache(fs)
   } else {
     HashMap[URI, FileStatus]()
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1318069261


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -492,10 +492,8 @@ private[spark] class Client(
   private[yarn] def getPreloadedStatCache(
       fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
     val statCache = HashMap[URI, FileStatus]()
-    val jars = sparkConf.get(SPARK_JARS)
-    val directoryToFiles = jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
-
-    directoryToFiles.foreach { case (dir: URI, filesInDir: HashSet[String]) =>
+    val jars = sparkConf.get(JARS_TO_DISTRIBUTE)

Review Comment:
   I am not sure I understood the comment :-)
   `statCache` is used within `prepareLocalResources` - within that method, it is primarily used in context of `SPARK_JARS` - so why change to a different config ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shuwang21 commented on pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "shuwang21 (via GitHub)" <gi...@apache.org>.
shuwang21 commented on PR #42357:
URL: https://github.com/apache/spark/pull/42357#issuecomment-1673143975

   cc: @mridulm @otterc @zhouyejoe @xkrogen Thanks.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shuwang21 commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "shuwang21 (via GitHub)" <gi...@apache.org>.
shuwang21 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1290850531


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+        val parentUri = path.getParent.toUri
+        directoryCounter.update(parentUri, directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.collect {
+      case (uri, count) if count >= perDirectoryThreshold => uri
+    }
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fs : the target file system
+   * @param statCache : stat cache to be preloaded with fileStatus
+   */
+  def statCachePreload(fs: FileSystem, statCache: Map[URI, FileStatus]): Unit = {
+    logDebug("Preload the following directories")

Review Comment:
   Remove this line. Within `foreach`, update the debug log instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1329512411


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -533,9 +536,12 @@ private[spark] class Client(
     // If preload is enabled, preload the statCache with the files in the directories
     val statCache = if (statCachePreloadEnabled) {
       // Consider only following configurations, as they involve the distribution of multiple files
-      val files = sparkConf.get(SPARK_JARS).orNull ++ sparkConf.get(JARS_TO_DISTRIBUTE) ++
-        sparkConf.get(FILES_TO_DISTRIBUTE) ++ sparkConf.get(ARCHIVES_TO_DISTRIBUTE) ++
-        sparkConf.get(PY_FILES) ++ pySparkArchives
+      var files = sparkConf.get(JARS_TO_DISTRIBUTE) ++ sparkConf.get(FILES_TO_DISTRIBUTE) ++
+        sparkConf.get(ARCHIVES_TO_DISTRIBUTE) ++ sparkConf.get(PY_FILES) ++ pySparkArchives
+      if (!sparkConf.get(SPARK_JARS).isEmpty) {

Review Comment:
   Why this `if` condition ? we can directly added to `files` in previous line itself ?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -494,11 +494,14 @@ private[spark] class Client(
       fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
     val statCache = HashMap[URI, FileStatus]()
     directoriesToBePreloaded(files).foreach { case (dir: URI, filesInDir: HashSet[String]) =>
-      fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile()).
-        filter(f => filesInDir.contains(f.getPath.getName)).foreach { fileStatus =>
-          val uri = fileStatus.getPath.toUri
+      fsLookup(dir).listStatus(new Path(dir), new PathFilter() {
+        override def accept(path: Path): Boolean = filesInDir.contains(path.getName)
+      }).filter(_.isFile()).foreach { fileStatus =>
+        val uri = fileStatus.getPath.toUri
+        if (uri != null) {

Review Comment:
   IIRC `uri` cant be `null` - why was this condition added ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shuwang21 commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "shuwang21 (via GitHub)" <gi...@apache.org>.
shuwang21 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1295734605


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, the corresponding file status
+   * from the directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a hashmap contains directories to be preloaded and all file names in that directory
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): HashMap[URI, Set[String]] = {
+    val directoryToFiles = new HashMap[URI, Set[String]]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val currentUri = Utils.resolveURI(jar)
+        val parentUri = new Path(currentUri).getParent.toUri
+        directoryToFiles.update(parentUri, directoryToFiles.getOrElse(parentUri, Set.empty)
+          .union(Set(currentUri.normalize().toString)))
+      }
+    }
+    directoryToFiles.filter(_._2.size >= statCachePreloadDirectoryCountThreshold)
+  }
+
+  /**
+   * Preload the statCache with file status. List all files from that directory and add them to the
+   * statCache.
+   *
+   * @param fsLookup: Function for looking up an FS based on a URI; override for testing
+   * @return A preloaded statCache with fileStatus
+   */
+  private[yarn] def getPreloadedStatCache(
+      fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
+    val statCache = HashMap[URI, FileStatus]()
+    val jars = sparkConf.get(SPARK_JARS)
+    val directoryToFiles = jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
+
+    directoryToFiles.foreach { case (dir: URI, filesInDir: Set[String]) =>
+      fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile()).
+        filter(f => filesInDir.contains(f.getPath.toString)).foreach { fileStatus =>
+          val uri = fileStatus.getPath.toUri
+          statCache.put(uri, fileStatus)

Review Comment:
   We need file status from the parent directory to the root directory. I am checking whether `listStatus` also return the current directory, but it seems that it does not. This means we have to use getFileStatus to get the fileStatus for the parent directory. 
   
   We can also defer this operation, with this [PR](https://github.com/apache/spark/pull/41821), the parent directory will be cached within `ancestorsHaveExecutePermissions` if not yet in `statCache`. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1294151353


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,52 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): List[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val parentUri = new Path(Utils.resolveURI(jar)).getParent.toUri
+        directoryCounter.update(parentUri, directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.filter(_._2 >= statCachePreloadDirectoryCountThreshold).keys.toList
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fsLookup: Function for looking up an FS based on a URI; override for testing
+   * @return A preloaded statCache with fileStatus
+   */
+  private[yarn] def getPreloadedStatCache(
+      fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
+    val statCache = HashMap[URI, FileStatus]()
+    val jars = sparkConf.get(SPARK_JARS)
+    val directories = jars.map(directoriesToBePreloaded).getOrElse(ArrayBuffer.empty[URI])
+
+    directories.foreach { dir =>
+      fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile())
+        .foreach { fileStatus =>
+          val uri = fileStatus.getPath.toUri
+          if (jars.exists(_.contains(uri.toString))) {
+            logDebug(s"Add ${uri} file status to statCache.")
+            statCache.put(uri, fileStatus)
+          }
+        }
+    }

Review Comment:
   Perhaps my comment was not very clear.
   You can test what I am referring to by checking for something with unresolved path in spark.jar - for example, something like `hdfs://host:123/a/b/c/../file`
   
   The `jars.exists(_.contains(uri.toString))` check will return false - and so not cache this as `Path` always resolves the URI - and so will return `/a/b/file` for Path here.
   This pattern is quite common when users are specifying jars (use of `..`, `./`, etc in the path)
   
   Btw, we have to update the tests in this PR to catch these type of paths as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1318069261


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -492,10 +492,8 @@ private[spark] class Client(
   private[yarn] def getPreloadedStatCache(
       fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
     val statCache = HashMap[URI, FileStatus]()
-    val jars = sparkConf.get(SPARK_JARS)
-    val directoryToFiles = jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
-
-    directoryToFiles.foreach { case (dir: URI, filesInDir: HashSet[String]) =>
+    val jars = sparkConf.get(JARS_TO_DISTRIBUTE)

Review Comment:
   (Edited comment):
   
   I am not sure I understood the comment :-)
   `statCache` is used within `prepareLocalResources` - within that method, it is used in context of `SPARK_JARS` and `distribute` method - so why use it only for `JARS_TO_DISTRIBUTE` ?
   
   Or in other words, if it applies to all seq's - why only `JARS_TO_DISTRIBUTE` and not to all relevant configs ? (`SPARK_JARS`, `JARS_TO_DISTRIBUTE`, `FILES_TO_DISTRIBUTE`, `ARCHIVES_TO_DISTRIBUTE`, `pySparkArchives`, `PY_FILES`, etc)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1318069261


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -492,10 +492,8 @@ private[spark] class Client(
   private[yarn] def getPreloadedStatCache(
       fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
     val statCache = HashMap[URI, FileStatus]()
-    val jars = sparkConf.get(SPARK_JARS)
-    val directoryToFiles = jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
-
-    directoryToFiles.foreach { case (dir: URI, filesInDir: HashSet[String]) =>
+    val jars = sparkConf.get(JARS_TO_DISTRIBUTE)

Review Comment:
   I am not sure I understood the comment :-)
   `statCache` is used within `prepareLocalResources` - within that method, it is primarily used in context of `SPARK_JARS` - so why change to a different config ?
   
   Or in other words, if it applies to both - why only `JARS_TO_DISTRIBUTE` and not to all relevant configs ? (`SPARK_JARS`, `JARS_TO_DISTRIBUTE`, `FILES_TO_DISTRIBUTE`, `ARCHIVES_TO_DISTRIBUTE`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shuwang21 commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "shuwang21 (via GitHub)" <gi...@apache.org>.
shuwang21 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1318074759


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -492,10 +492,8 @@ private[spark] class Client(
   private[yarn] def getPreloadedStatCache(
       fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
     val statCache = HashMap[URI, FileStatus]()
-    val jars = sparkConf.get(SPARK_JARS)
-    val directoryToFiles = jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
-
-    directoryToFiles.foreach { case (dir: URI, filesInDir: HashSet[String]) =>
+    val jars = sparkConf.get(JARS_TO_DISTRIBUTE)

Review Comment:
   Yeah, Actually I was thinking the same thing, maybe should include all those (`SPARK_JARS`, `JARS_TO_DISTRIBUTE`, `FILES_TO_DISTRIBUTE`, `ARCHIVES_TO_DISTRIBUTE`, `pySparkArchives`, `PY_FILES`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] venkata91 commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "venkata91 (via GitHub)" <gi...@apache.org>.
venkata91 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1292662552


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client side. This feature" +
+      " analyzes the pattern of resources paths. If multiple resources shared the same parent" +
+      " directory, a single <code>listStatus</code> will be invoked on the parent directory" +
+      " instead of multiple <code>getFileStatus</code> performed on each individual resources." +
+      " If most resources from a small set of directories, this can substantially improve job" +

Review Comment:
   nit: `most resources from a small` -> `most resources are from a small`



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client side. This feature" +
+      " analyzes the pattern of resources paths. If multiple resources shared the same parent" +
+      " directory, a single <code>listStatus</code> will be invoked on the parent directory" +
+      " instead of multiple <code>getFileStatus</code> performed on each individual resources." +
+      " If most resources from a small set of directories, this can substantially improve job" +
+      " submission time. Noticing, this could potentially increase the memory overhead at client" +

Review Comment:
   `Noticing, this could potentially increase the memory overhead at client` -> `Note: Enabling this feature may potentially increase client memory overhead.`



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -485,7 +534,12 @@ private[spark] class Client(
     val localResources = HashMap[String, LocalResource]()
     FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION))
 
-    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+    // If preload is enabled, preload the statCache with the files in the directories
+    val statCache = if (statCachePreloadEnabled) {
+      getPreloadedStatCache()
+    } else {
+      HashMap[URI, FileStatus]()

Review Comment:
   nit: `Map.empty[URI, FileStatus]`?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,52 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its

Review Comment:
   `Count the number of resources in each directory and if it is greater than spark.yarn.client.statCache.preloaded.perDirectoryThreshold value, all the resources in that directory will be preloaded`?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client side. This feature" +
+      " analyzes the pattern of resources paths. If multiple resources shared the same parent" +
+      " directory, a single <code>listStatus</code> will be invoked on the parent directory" +
+      " instead of multiple <code>getFileStatus</code> performed on each individual resources." +
+      " If most resources from a small set of directories, this can substantially improve job" +
+      " submission time. Noticing, this could potentially increase the memory overhead at client" +
+      " side.")
+    .version("4.0.0")
+    .booleanConf
+    .createWithDefault(false)
+
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD =
+    ConfigBuilder("spark.yarn.client.statCache.preload.perDirectoryThreshold")
+      .doc("This configuration defines the threshold for the number of resources in a directory" +

Review Comment:
   How about ```This configuration sets the resource count threshold that triggers statCache preloading. If the count of resources specified by <code>spark.yarn.jars</code> within a directory is equal to or greater than this threshold, the statCache preloading for that directory will activate. Note that this configuration only works when <code>spark.yarn.client.statCache.preloaded.enabled</code> is enabled.```?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client side. This feature" +

Review Comment:
   May be, ```Enabling this configuration preloads statCache at the YARN client side. It analyzes resource path patterns and invokes a single `fileSystem.listStatus` on a parent directory if multiple resources share the same directory. This can significantly improve job performance if most resources come from a small set of directories.```?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,52 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): List[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val parentUri = new Path(Utils.resolveURI(jar)).getParent.toUri
+        directoryCounter.update(parentUri, directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.filter(_._2 >= statCachePreloadDirectoryCountThreshold).keys.toList
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, we will list all

Review Comment:
   Instead of `we will list all files` -> `list all files`?



##########
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala:
##########
@@ -666,6 +666,42 @@ class ClientSuite extends SparkFunSuite with Matchers {
     assertUserClasspathUrls(cluster = true, replacementRootPath)
   }
 
+  test("SPARK-44306: test directoriesToBePreloaded") {
+    val sparkConf = new SparkConf()
+      .set(YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD, 3)
+    val client = createClient(sparkConf, args = Array("--jar", USER))
+    val directories = client.directoriesToBePreloaded(Seq(
+      "hdfs:/valid/a.jar",
+      "hdfs:/valid/b.jar",
+      "hdfs:/valid/c.jar",
+      "s3:/valid/a.jar",
+      "s3:/valid/b.jar",
+      "s3:/valid/c.jar",
+      "hdfs:/glob/*",
+      "hdfs:/fewer/a.jar",
+      "hdfs:/fewer/b.jar",
+      "local:/local/a.jar",
+      "local:/local/b.jar",
+      "local:/local/c.jar"))
+    assert(directories.size == 2 && directories.contains(new URI("hdfs:/valid"))
+      && directories.contains(new URI("s3:/valid")))

Review Comment:
   nit: `!directories.contains(new URI("local:/local"))` - check local dirs are not added to `directories`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shuwang21 commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "shuwang21 (via GitHub)" <gi...@apache.org>.
shuwang21 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1294172027


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,52 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): List[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val parentUri = new Path(Utils.resolveURI(jar)).getParent.toUri
+        directoryCounter.update(parentUri, directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.filter(_._2 >= statCachePreloadDirectoryCountThreshold).keys.toList
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fsLookup: Function for looking up an FS based on a URI; override for testing
+   * @return A preloaded statCache with fileStatus
+   */
+  private[yarn] def getPreloadedStatCache(
+      fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
+    val statCache = HashMap[URI, FileStatus]()
+    val jars = sparkConf.get(SPARK_JARS)
+    val directories = jars.map(directoriesToBePreloaded).getOrElse(ArrayBuffer.empty[URI])
+
+    directories.foreach { dir =>
+      fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile())
+        .foreach { fileStatus =>
+          val uri = fileStatus.getPath.toUri
+          if (jars.exists(_.contains(uri.toString))) {
+            logDebug(s"Add ${uri} file status to statCache.")
+            statCache.put(uri, fileStatus)
+          }
+        }
+    }

Review Comment:
   Nice catch! Please take another look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1295351248


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, the corresponding file status
+   * from the directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a hashmap contains directories to be preloaded and all file names in that directory
+   * */

Review Comment:
   nit:
   ```suggestion
      */
   ```



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, the corresponding file status
+   * from the directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a hashmap contains directories to be preloaded and all file names in that directory
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): HashMap[URI, Set[String]] = {
+    val directoryToFiles = new HashMap[URI, Set[String]]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val currentUri = Utils.resolveURI(jar)
+        val parentUri = new Path(currentUri).getParent.toUri
+        directoryToFiles.update(parentUri, directoryToFiles.getOrElse(parentUri, Set.empty)
+          .union(Set(currentUri.normalize().toString)))

Review Comment:
   It would be simpler to do something like:
   
   ```suggestion
           val currentPath = new Path(Utils.resolveURI(jar))
           val parentUri = currentPath.getParent.toUri
           directoryToFiles.getOrElseUpdate(parentUri, new HashSet[String]()) += currentPath.getName
   ```
   
   



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,30 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client side. This feature" +
+      " analyzes the pattern of resources paths. If multiple resources shared the same parent" +

Review Comment:
   nit:
   ```suggestion
         " analyzes the pattern of resources paths, and if multiple resources shared the same parent" +
   ```



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, the corresponding file status
+   * from the directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a hashmap contains directories to be preloaded and all file names in that directory
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): HashMap[URI, Set[String]] = {
+    val directoryToFiles = new HashMap[URI, Set[String]]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val currentUri = Utils.resolveURI(jar)
+        val parentUri = new Path(currentUri).getParent.toUri
+        directoryToFiles.update(parentUri, directoryToFiles.getOrElse(parentUri, Set.empty)
+          .union(Set(currentUri.normalize().toString)))
+      }
+    }
+    directoryToFiles.filter(_._2.size >= statCachePreloadDirectoryCountThreshold)
+  }
+
+  /**
+   * Preload the statCache with file status. List all files from that directory and add them to the
+   * statCache.
+   *
+   * @param fsLookup: Function for looking up an FS based on a URI; override for testing
+   * @return A preloaded statCache with fileStatus
+   */
+  private[yarn] def getPreloadedStatCache(
+      fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
+    val statCache = HashMap[URI, FileStatus]()
+    val jars = sparkConf.get(SPARK_JARS)
+    val directoryToFiles = jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
+
+    directoryToFiles.foreach { case (dir: URI, filesInDir: Set[String]) =>
+      fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile()).
+        filter(f => filesInDir.contains(f.getPath.toString)).foreach { fileStatus =>

Review Comment:
   This would become:
   
   ```suggestion
           filter(f => filesInDir.contains(f.getPath.getName)).foreach { fileStatus =>
   ```
   
   once we make the change above.



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,30 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client side. This feature" +
+      " analyzes the pattern of resources paths. If multiple resources shared the same parent" +
+      " directory, a single <code>listStatus</code> will be invoked on the parent directory" +
+      " instead of multiple <code>getFileStatus</code> performed on each individual resources." +
+      " If most resources from a small set of directories, this can substantially improve job" +
+      " submission time. Enabling this feature may potentially increase client memory overhead.")
+    .version("4.0.0")
+    .booleanConf
+    .createWithDefault(false)
+
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD =
+    ConfigBuilder("spark.yarn.client.statCache.preload.perDirectoryThreshold")
+      .doc("This configuration defines the threshold for the number of resources in a directory" +
+        " that triggers the activation of the statCache preloading. When the count of individual" +

Review Comment:
   nit: For both the doc notes, let us be consistent with the convention in the other doc notes in this file w.r.t starting space.
   For example:
   ```suggestion
         .doc("This configuration defines the threshold for the number of resources in a directory " +
           "that triggers the activation of the statCache preloading. When the count of individual " +
           <snip>
   ```



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, the corresponding file status
+   * from the directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a hashmap contains directories to be preloaded and all file names in that directory
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): HashMap[URI, Set[String]] = {
+    val directoryToFiles = new HashMap[URI, Set[String]]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val currentUri = Utils.resolveURI(jar)
+        val parentUri = new Path(currentUri).getParent.toUri
+        directoryToFiles.update(parentUri, directoryToFiles.getOrElse(parentUri, Set.empty)
+          .union(Set(currentUri.normalize().toString)))
+      }
+    }
+    directoryToFiles.filter(_._2.size >= statCachePreloadDirectoryCountThreshold)
+  }
+
+  /**
+   * Preload the statCache with file status. List all files from that directory and add them to the
+   * statCache.
+   *
+   * @param fsLookup: Function for looking up an FS based on a URI; override for testing
+   * @return A preloaded statCache with fileStatus
+   */
+  private[yarn] def getPreloadedStatCache(
+      fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
+    val statCache = HashMap[URI, FileStatus]()
+    val jars = sparkConf.get(SPARK_JARS)
+    val directoryToFiles = jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
+
+    directoryToFiles.foreach { case (dir: URI, filesInDir: Set[String]) =>
+      fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile()).
+        filter(f => filesInDir.contains(f.getPath.toString)).foreach { fileStatus =>
+          val uri = fileStatus.getPath.toUri
+          statCache.put(uri, fileStatus)

Review Comment:
   QQ: Since we are loading the `FileStatus` for the jar, dont we also need the parent's `FileStatus` ? Cache that as well ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shuwang21 commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "shuwang21 (via GitHub)" <gi...@apache.org>.
shuwang21 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1292891976


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,52 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent directory. If its

Review Comment:
   Thanks. I would say `non-local and non-glob ` is very important here. 
   
   1. For local resources, it will not invoke PRC call. All communication is local which has less overhead.
   2. For glob resources, the corresponding file status will be obtained from `val fss = pathFs.globStatus(path)`. 
   3. Finally, with Erik's suggestion, we will return all files from the directory, we will further filter out those file are not from the `--jars` configuration.
   4. We only preload the file status of the resources instead of the resources itself. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1318069261


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -492,10 +492,8 @@ private[spark] class Client(
   private[yarn] def getPreloadedStatCache(
       fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
     val statCache = HashMap[URI, FileStatus]()
-    val jars = sparkConf.get(SPARK_JARS)
-    val directoryToFiles = jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
-
-    directoryToFiles.foreach { case (dir: URI, filesInDir: HashSet[String]) =>
+    val jars = sparkConf.get(JARS_TO_DISTRIBUTE)

Review Comment:
   I am not sure I understood the comment :-)
   `statCache` is used within `prepareLocalResources` - within that method, it is primarily used in context of `SPARK_JARS` - so why change to a different config ?
   
   Or in other words, if it applies to both - why only `JARS_TO_DISTRIBUTE` and not to all relevant configs ? (`SPARK_JARS`, `JARS_TO_DISTRIBUTE`, `FILES_TO_DISTRIBUTE`, `ARCHIVES_TO_DISTRIBUTE`, `pySparkArchives`)



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -492,10 +492,8 @@ private[spark] class Client(
   private[yarn] def getPreloadedStatCache(
       fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
     val statCache = HashMap[URI, FileStatus]()
-    val jars = sparkConf.get(SPARK_JARS)
-    val directoryToFiles = jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
-
-    directoryToFiles.foreach { case (dir: URI, filesInDir: HashSet[String]) =>
+    val jars = sparkConf.get(JARS_TO_DISTRIBUTE)

Review Comment:
   I am not sure I understood the comment :-)
   `statCache` is used within `prepareLocalResources` - within that method, it is primarily used in context of `SPARK_JARS` - so why change to a different config ?
   
   Or in other words, if it applies to both - why only `JARS_TO_DISTRIBUTE` and not to all relevant configs ? (`SPARK_JARS`, `JARS_TO_DISTRIBUTE`, `FILES_TO_DISTRIBUTE`, `ARCHIVES_TO_DISTRIBUTE`, `pySparkArchives`, `PY_FILES`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1318069261


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -492,10 +492,8 @@ private[spark] class Client(
   private[yarn] def getPreloadedStatCache(
       fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
     val statCache = HashMap[URI, FileStatus]()
-    val jars = sparkConf.get(SPARK_JARS)
-    val directoryToFiles = jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
-
-    directoryToFiles.foreach { case (dir: URI, filesInDir: HashSet[String]) =>
+    val jars = sparkConf.get(JARS_TO_DISTRIBUTE)

Review Comment:
   I am not sure I understood the comment :-)
   `statCache` is used within `prepareLocalResources` - within that method, it is primarily used in context of `SPARK_JARS` - so why change to a different config ?
   
   Or in other words, if it applies to both - why only `JARS_TO_DISTRIBUTE` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shuwang21 commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client

Posted by "shuwang21 (via GitHub)" <gi...@apache.org>.
shuwang21 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1318067644


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -492,10 +492,8 @@ private[spark] class Client(
   private[yarn] def getPreloadedStatCache(
       fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = {
     val statCache = HashMap[URI, FileStatus]()
-    val jars = sparkConf.get(SPARK_JARS)
-    val directoryToFiles = jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
-
-    directoryToFiles.foreach { case (dir: URI, filesInDir: HashSet[String]) =>
+    val jars = sparkConf.get(JARS_TO_DISTRIBUTE)

Review Comment:
   JARS_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.jars")
   SPARK_JARS = ConfigBuilder("spark.yarn.jars")
   
   So, most user-related jars come from `JARS_TO_DISTRIBUTE`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org