You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ho...@apache.org on 2020/11/18 20:40:15 UTC

[spark] branch master updated: [SPARK-32381][CORE][SQL][FOLLOWUP] More cleanup on HadoopFSUtils

This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 27cd945  [SPARK-32381][CORE][SQL][FOLLOWUP] More cleanup on HadoopFSUtils
27cd945 is described below

commit 27cd945c151dccb5ac863e6bc2c4f5b2c6a6d996
Author: Chao Sun <su...@apple.com>
AuthorDate: Wed Nov 18 12:39:00 2020 -0800

    [SPARK-32381][CORE][SQL][FOLLOWUP] More cleanup on HadoopFSUtils
    
    ### What changes were proposed in this pull request?
    
    This PR is a follow-up of #29471 and does the following improvements for `HadoopFSUtils`:
    1. Removes the extra `filterFun` from the listing API and combines it with the `filter`.
    2. Removes `SerializableBlockLocation` and `SerializableFileStatus` given that `BlockLocation` and `FileStatus` are already serializable.
    3. Hides the `isRootLevel` flag from the top-level API.
    
    ### Why are the changes needed?
    
    Main purpose is to simplify the logic within `HadoopFSUtils` as well as cleanup the API.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing unit tests (e.g., `FileIndexSuite`)
    
    Closes #29959 from sunchao/hadoop-fs-utils-followup.
    
    Authored-by: Chao Sun <su...@apple.com>
    Signed-off-by: Holden Karau <hk...@apple.com>
---
 .../org/apache/spark/util/HadoopFSUtils.scala      | 104 ++++-----------------
 .../spark/sql/execution/command/CommandUtils.scala |   2 +-
 .../execution/datasources/InMemoryFileIndex.scala  |  19 ++--
 3 files changed, 31 insertions(+), 94 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
index c0a135e..a3a528c 100644
--- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.viewfs.ViewFileSystem
 import org.apache.hadoop.hdfs.DistributedFileSystem
 
 import org.apache.spark._
-import org.apache.spark.annotation.Private
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.source.HiveCatalogMetrics
 
@@ -45,8 +44,6 @@ private[spark] object HadoopFSUtils extends Logging {
    * @param paths Input paths to list
    * @param hadoopConf Hadoop configuration
    * @param filter Path filter used to exclude leaf files from result
-   * @param isRootLevel Whether the input paths are at the root level, i.e., they are the root
-   *                    paths as opposed to nested paths encountered during recursive calls of this.
    * @param ignoreMissingFiles Ignore missing files that happen during recursive listing
    *                           (e.g., due to race conditions)
    * @param ignoreLocality Whether to fetch data locality info when listing leaf files. If false,
@@ -57,11 +54,22 @@ private[spark] object HadoopFSUtils extends Logging {
    * @param parallelismMax The maximum parallelism for listing. If the number of input paths is
    *                       larger than this value, parallelism will be throttled to this value
    *                       to avoid generating too many tasks.
-   * @param filterFun Optional predicate on the leaf files. Files who failed the check will be
-   *                  excluded from the results
    * @return for each input path, the set of discovered files for the path
    */
   def parallelListLeafFiles(
+    sc: SparkContext,
+    paths: Seq[Path],
+    hadoopConf: Configuration,
+    filter: PathFilter,
+    ignoreMissingFiles: Boolean,
+    ignoreLocality: Boolean,
+    parallelismThreshold: Int,
+    parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = {
+    parallelListLeafFilesInternal(sc, paths, hadoopConf, filter, isRootLevel = true,
+      ignoreMissingFiles, ignoreLocality, parallelismThreshold, parallelismMax)
+  }
+
+  private def parallelListLeafFilesInternal(
       sc: SparkContext,
       paths: Seq[Path],
       hadoopConf: Configuration,
@@ -70,8 +78,7 @@ private[spark] object HadoopFSUtils extends Logging {
       ignoreMissingFiles: Boolean,
       ignoreLocality: Boolean,
       parallelismThreshold: Int,
-      parallelismMax: Int,
-      filterFun: Option[String => Boolean] = None): Seq[(Path, Seq[FileStatus])] = {
+      parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = {
 
     // Short-circuits parallel listing when serial listing is likely to be faster.
     if (paths.size <= parallelismThreshold) {
@@ -85,8 +92,7 @@ private[spark] object HadoopFSUtils extends Logging {
           ignoreLocality = ignoreLocality,
           isRootPath = isRootLevel,
           parallelismThreshold = parallelismThreshold,
-          parallelismMax = parallelismMax,
-          filterFun = filterFun)
+          parallelismMax = parallelismMax)
         (path, leafFiles)
       }
     }
@@ -126,58 +132,16 @@ private[spark] object HadoopFSUtils extends Logging {
               ignoreMissingFiles = ignoreMissingFiles,
               ignoreLocality = ignoreLocality,
               isRootPath = isRootLevel,
-              filterFun = filterFun,
               parallelismThreshold = Int.MaxValue,
               parallelismMax = 0)
             (path, leafFiles)
           }.iterator
-        }.map { case (path, statuses) =>
-            val serializableStatuses = statuses.map { status =>
-              // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
-              val blockLocations = status match {
-                case f: LocatedFileStatus =>
-                  f.getBlockLocations.map { loc =>
-                    SerializableBlockLocation(
-                      loc.getNames,
-                      loc.getHosts,
-                      loc.getOffset,
-                      loc.getLength)
-                  }
-
-                case _ =>
-                  Array.empty[SerializableBlockLocation]
-              }
-
-              SerializableFileStatus(
-                status.getPath.toString,
-                status.getLen,
-                status.isDirectory,
-                status.getReplication,
-                status.getBlockSize,
-                status.getModificationTime,
-                status.getAccessTime,
-                blockLocations)
-            }
-            (path.toString, serializableStatuses)
         }.collect()
     } finally {
       sc.setJobDescription(previousJobDescription)
     }
 
-    // turn SerializableFileStatus back to Status
-    statusMap.map { case (path, serializableStatuses) =>
-      val statuses = serializableStatuses.map { f =>
-        val blockLocations = f.blockLocations.map { loc =>
-          new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
-        }
-        new LocatedFileStatus(
-          new FileStatus(
-            f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime,
-            new Path(f.path)),
-          blockLocations)
-      }
-      (new Path(path), statuses)
-    }
+    statusMap.toSeq
   }
 
   // scalastyle:off argcount
@@ -197,7 +161,6 @@ private[spark] object HadoopFSUtils extends Logging {
       ignoreMissingFiles: Boolean,
       ignoreLocality: Boolean,
       isRootPath: Boolean,
-      filterFun: Option[String => Boolean],
       parallelismThreshold: Int,
       parallelismMax: Int): Seq[FileStatus] = {
 
@@ -245,19 +208,11 @@ private[spark] object HadoopFSUtils extends Logging {
         Array.empty[FileStatus]
     }
 
-    def doFilter(statuses: Array[FileStatus]) = filterFun match {
-      case Some(shouldFilterOut) =>
-        statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
-      case None =>
-        statuses
-    }
-
-    val filteredStatuses = doFilter(statuses)
     val allLeafStatuses = {
-      val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
+      val (dirs, topLevelFiles) = statuses.partition(_.isDirectory)
       val nestedFiles: Seq[FileStatus] = contextOpt match {
         case Some(context) if dirs.size > parallelismThreshold =>
-          parallelListLeafFiles(
+          parallelListLeafFilesInternal(
             context,
             dirs.map(_.getPath),
             hadoopConf = hadoopConf,
@@ -265,7 +220,6 @@ private[spark] object HadoopFSUtils extends Logging {
             isRootLevel = false,
             ignoreMissingFiles = ignoreMissingFiles,
             ignoreLocality = ignoreLocality,
-            filterFun = filterFun,
             parallelismThreshold = parallelismThreshold,
             parallelismMax = parallelismMax
           ).flatMap(_._2)
@@ -279,7 +233,6 @@ private[spark] object HadoopFSUtils extends Logging {
               ignoreMissingFiles = ignoreMissingFiles,
               ignoreLocality = ignoreLocality,
               isRootPath = false,
-              filterFun = filterFun,
               parallelismThreshold = parallelismThreshold,
               parallelismMax = parallelismMax)
           }
@@ -289,8 +242,7 @@ private[spark] object HadoopFSUtils extends Logging {
     }
 
     val missingFiles = mutable.ArrayBuffer.empty[String]
-    val filteredLeafStatuses = doFilter(allLeafStatuses)
-    val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
+    val resolvedLeafStatuses = allLeafStatuses.flatMap {
       case f: LocatedFileStatus =>
         Some(f)
 
@@ -339,22 +291,4 @@ private[spark] object HadoopFSUtils extends Logging {
     resolvedLeafStatuses
   }
   // scalastyle:on argcount
-
-  /** A serializable variant of HDFS's BlockLocation. */
-  private case class SerializableBlockLocation(
-    names: Array[String],
-    hosts: Array[String],
-    offset: Long,
-    length: Long)
-
-  /** A serializable variant of HDFS's FileStatus. */
-  private case class SerializableFileStatus(
-    path: String,
-    length: Long,
-    isDir: Boolean,
-    blockReplication: Short,
-    blockSize: Long,
-    modificationTime: Long,
-    accessTime: Long,
-    blockLocations: Array[SerializableBlockLocation])
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index 8bf7504..6495463 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -163,7 +163,7 @@ object CommandUtils extends Logging {
       .getConfString("hive.exec.stagingdir", ".hive-staging")
     val filter = new PathFilterIgnoreNonData(stagingDir)
     val sizes = InMemoryFileIndex.bulkListLeafFiles(paths.flatten,
-      sparkSession.sessionState.newHadoopConf(), filter, sparkSession, isRootLevel = true).map {
+      sparkSession.sessionState.newHadoopConf(), filter, sparkSession).map {
       case (_, files) => files.map(_.getLen).sum
     }
     // the size is 0 where paths(i) is not defined and sizes(i) where it is defined
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 130894e..2127595 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -128,7 +128,7 @@ class InMemoryFileIndex(
     }
     val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
     val discovered = InMemoryFileIndex.bulkListLeafFiles(
-      pathsToFetch.toSeq, hadoopConf, filter, sparkSession, isRootLevel = true)
+      pathsToFetch.toSeq, hadoopConf, filter, sparkSession)
     discovered.foreach { case (path, leafFiles) =>
       HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
       fileStatusCache.putLeafFiles(path, leafFiles.toArray)
@@ -146,20 +146,17 @@ object InMemoryFileIndex extends Logging {
       paths: Seq[Path],
       hadoopConf: Configuration,
       filter: PathFilter,
-      sparkSession: SparkSession,
-      isRootLevel: Boolean): Seq[(Path, Seq[FileStatus])] = {
+      sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
     HadoopFSUtils.parallelListLeafFiles(
       sc = sparkSession.sparkContext,
       paths = paths,
       hadoopConf = hadoopConf,
-      filter = filter,
-      isRootLevel = isRootLevel,
+      filter = new PathFilterWrapper(filter),
       ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles,
       ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality,
       parallelismThreshold = sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold,
-      parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism,
-      filterFun = Some(shouldFilterOut))
- }
+      parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism)
+  }
 
   /** Checks if we should filter out this path name. */
   def shouldFilterOut(pathName: String): Boolean = {
@@ -175,3 +172,9 @@ object InMemoryFileIndex extends Logging {
     exclude && !include
   }
 }
+
+private class PathFilterWrapper(val filter: PathFilter) extends PathFilter with Serializable {
+  override def accept(path: Path): Boolean = {
+    (filter == null || filter.accept(path)) && !InMemoryFileIndex.shouldFilterOut(path.getName)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org