You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/10/08 19:01:41 UTC

[GitHub] [spark] holdenk commented on a change in pull request #29959: [WIP][SPARK-32381][CORE][SQL][FOLLOWUP] More cleanup on HadoopFSUtils

holdenk commented on a change in pull request #29959:
URL: https://github.com/apache/spark/pull/29959#discussion_r501927461



##########
File path: core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
##########
@@ -57,11 +50,22 @@ private[spark] object HadoopFSUtils extends Logging {
    * @param parallelismMax The maximum parallelism for listing. If the number of input paths is
    *                       larger than this value, parallelism will be throttled to this value
    *                       to avoid generating too many tasks.
-   * @param filterFun Optional predicate on the leaf files. Files who failed the check will be
-   *                  excluded from the results
    * @return for each input path, the set of discovered files for the path
    */
   def parallelListLeafFiles(
+    sc: SparkContext,
+    paths: Seq[Path],
+    hadoopConf: Configuration,
+    filter: PathFilter,
+    ignoreMissingFiles: Boolean,
+    ignoreLocality: Boolean,
+    parallelismThreshold: Int,
+    parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = {
+    parallelListLeafFilesInternal(sc, paths, hadoopConf, filter, true, ignoreMissingFiles,

Review comment:
       nit: For readability I'd passed this as a named parameter since a bare boolean isn't very clear.

##########
File path: core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
##########
@@ -207,18 +166,14 @@ private[spark] object HadoopFSUtils extends Logging {
     // Note that statuses only include FileStatus for the files and dirs directly under path,
     // and does not include anything else recursively.
     val statuses: Array[FileStatus] = try {
-      fs match {
-        // DistributedFileSystem overrides listLocatedStatus to make 1 single call to namenode
-        // to retrieve the file status with the file block location. The reason to still fallback
-        // to listStatus is because the default implementation would potentially throw a
-        // FileNotFoundException which is better handled by doing the lookups manually below.
-        case (_: DistributedFileSystem | _: ViewFileSystem) if !ignoreLocality =>
-          val remoteIter = fs.listLocatedStatus(path)
-          new Iterator[LocatedFileStatus]() {
-            def next(): LocatedFileStatus = remoteIter.next
-            def hasNext(): Boolean = remoteIter.hasNext
-          }.toArray
-        case _ => fs.listStatus(path)
+      if (ignoreLocality) {
+        fs.listStatus(path)
+      } else {
+        val remoteIter = fs.listLocatedStatus(path)

Review comment:
       Is there a chance a FS won't have this implemented? as per the previous code's comment.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
##########
@@ -214,9 +214,9 @@ class FileIndexSuite extends SharedSparkSession {
               assert(leafFiles.isEmpty)
             } else {
               assert(raceCondition == classOf[FileDeletionRaceFileSystem])
-              // One of the two leaf files was missing, but we should still list the other:
-              assert(leafFiles.size == 1)
-              assert(leafFiles.head.getPath == nonDeletedLeafFilePath)
+              // listLocatedStatus will fail as a whole because the default impl calls
+              // getFileBlockLocations
+              assert(leafFiles.isEmpty)

Review comment:
       This seems to indicate the change needs some work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org