You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ho...@apache.org on 2020/11/18 20:40:15 UTC
[spark] branch master updated: [SPARK-32381][CORE][SQL][FOLLOWUP]
More cleanup on HadoopFSUtils
This is an automated email from the ASF dual-hosted git repository.
holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 27cd945 [SPARK-32381][CORE][SQL][FOLLOWUP] More cleanup on HadoopFSUtils
27cd945 is described below
commit 27cd945c151dccb5ac863e6bc2c4f5b2c6a6d996
Author: Chao Sun <su...@apple.com>
AuthorDate: Wed Nov 18 12:39:00 2020 -0800
[SPARK-32381][CORE][SQL][FOLLOWUP] More cleanup on HadoopFSUtils
### What changes were proposed in this pull request?
This PR is a follow-up of #29471 and does the following improvements for `HadoopFSUtils`:
1. Removes the extra `filterFun` from the listing API and combines it with the `filter`.
2. Removes `SerializableBlockLocation` and `SerializableFileStatus` given that `BlockLocation` and `FileStatus` are already serializable.
3. Hides the `isRootLevel` flag from the top-level API.
### Why are the changes needed?
Main purpose is to simplify the logic within `HadoopFSUtils` as well as cleanup the API.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing unit tests (e.g., `FileIndexSuite`)
Closes #29959 from sunchao/hadoop-fs-utils-followup.
Authored-by: Chao Sun <su...@apple.com>
Signed-off-by: Holden Karau <hk...@apple.com>
---
.../org/apache/spark/util/HadoopFSUtils.scala | 104 ++++-----------------
.../spark/sql/execution/command/CommandUtils.scala | 2 +-
.../execution/datasources/InMemoryFileIndex.scala | 19 ++--
3 files changed, 31 insertions(+), 94 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
index c0a135e..a3a528c 100644
--- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.viewfs.ViewFileSystem
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.spark._
-import org.apache.spark.annotation.Private
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
@@ -45,8 +44,6 @@ private[spark] object HadoopFSUtils extends Logging {
* @param paths Input paths to list
* @param hadoopConf Hadoop configuration
* @param filter Path filter used to exclude leaf files from result
- * @param isRootLevel Whether the input paths are at the root level, i.e., they are the root
- * paths as opposed to nested paths encountered during recursive calls of this.
* @param ignoreMissingFiles Ignore missing files that happen during recursive listing
* (e.g., due to race conditions)
* @param ignoreLocality Whether to fetch data locality info when listing leaf files. If false,
@@ -57,11 +54,22 @@ private[spark] object HadoopFSUtils extends Logging {
* @param parallelismMax The maximum parallelism for listing. If the number of input paths is
* larger than this value, parallelism will be throttled to this value
* to avoid generating too many tasks.
- * @param filterFun Optional predicate on the leaf files. Files who failed the check will be
- * excluded from the results
* @return for each input path, the set of discovered files for the path
*/
def parallelListLeafFiles(
+ sc: SparkContext,
+ paths: Seq[Path],
+ hadoopConf: Configuration,
+ filter: PathFilter,
+ ignoreMissingFiles: Boolean,
+ ignoreLocality: Boolean,
+ parallelismThreshold: Int,
+ parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = {
+ parallelListLeafFilesInternal(sc, paths, hadoopConf, filter, isRootLevel = true,
+ ignoreMissingFiles, ignoreLocality, parallelismThreshold, parallelismMax)
+ }
+
+ private def parallelListLeafFilesInternal(
sc: SparkContext,
paths: Seq[Path],
hadoopConf: Configuration,
@@ -70,8 +78,7 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreMissingFiles: Boolean,
ignoreLocality: Boolean,
parallelismThreshold: Int,
- parallelismMax: Int,
- filterFun: Option[String => Boolean] = None): Seq[(Path, Seq[FileStatus])] = {
+ parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = {
// Short-circuits parallel listing when serial listing is likely to be faster.
if (paths.size <= parallelismThreshold) {
@@ -85,8 +92,7 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreLocality = ignoreLocality,
isRootPath = isRootLevel,
parallelismThreshold = parallelismThreshold,
- parallelismMax = parallelismMax,
- filterFun = filterFun)
+ parallelismMax = parallelismMax)
(path, leafFiles)
}
}
@@ -126,58 +132,16 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
isRootPath = isRootLevel,
- filterFun = filterFun,
parallelismThreshold = Int.MaxValue,
parallelismMax = 0)
(path, leafFiles)
}.iterator
- }.map { case (path, statuses) =>
- val serializableStatuses = statuses.map { status =>
- // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
- val blockLocations = status match {
- case f: LocatedFileStatus =>
- f.getBlockLocations.map { loc =>
- SerializableBlockLocation(
- loc.getNames,
- loc.getHosts,
- loc.getOffset,
- loc.getLength)
- }
-
- case _ =>
- Array.empty[SerializableBlockLocation]
- }
-
- SerializableFileStatus(
- status.getPath.toString,
- status.getLen,
- status.isDirectory,
- status.getReplication,
- status.getBlockSize,
- status.getModificationTime,
- status.getAccessTime,
- blockLocations)
- }
- (path.toString, serializableStatuses)
}.collect()
} finally {
sc.setJobDescription(previousJobDescription)
}
- // turn SerializableFileStatus back to Status
- statusMap.map { case (path, serializableStatuses) =>
- val statuses = serializableStatuses.map { f =>
- val blockLocations = f.blockLocations.map { loc =>
- new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
- }
- new LocatedFileStatus(
- new FileStatus(
- f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime,
- new Path(f.path)),
- blockLocations)
- }
- (new Path(path), statuses)
- }
+ statusMap.toSeq
}
// scalastyle:off argcount
@@ -197,7 +161,6 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreMissingFiles: Boolean,
ignoreLocality: Boolean,
isRootPath: Boolean,
- filterFun: Option[String => Boolean],
parallelismThreshold: Int,
parallelismMax: Int): Seq[FileStatus] = {
@@ -245,19 +208,11 @@ private[spark] object HadoopFSUtils extends Logging {
Array.empty[FileStatus]
}
- def doFilter(statuses: Array[FileStatus]) = filterFun match {
- case Some(shouldFilterOut) =>
- statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
- case None =>
- statuses
- }
-
- val filteredStatuses = doFilter(statuses)
val allLeafStatuses = {
- val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
+ val (dirs, topLevelFiles) = statuses.partition(_.isDirectory)
val nestedFiles: Seq[FileStatus] = contextOpt match {
case Some(context) if dirs.size > parallelismThreshold =>
- parallelListLeafFiles(
+ parallelListLeafFilesInternal(
context,
dirs.map(_.getPath),
hadoopConf = hadoopConf,
@@ -265,7 +220,6 @@ private[spark] object HadoopFSUtils extends Logging {
isRootLevel = false,
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
- filterFun = filterFun,
parallelismThreshold = parallelismThreshold,
parallelismMax = parallelismMax
).flatMap(_._2)
@@ -279,7 +233,6 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
isRootPath = false,
- filterFun = filterFun,
parallelismThreshold = parallelismThreshold,
parallelismMax = parallelismMax)
}
@@ -289,8 +242,7 @@ private[spark] object HadoopFSUtils extends Logging {
}
val missingFiles = mutable.ArrayBuffer.empty[String]
- val filteredLeafStatuses = doFilter(allLeafStatuses)
- val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
+ val resolvedLeafStatuses = allLeafStatuses.flatMap {
case f: LocatedFileStatus =>
Some(f)
@@ -339,22 +291,4 @@ private[spark] object HadoopFSUtils extends Logging {
resolvedLeafStatuses
}
// scalastyle:on argcount
-
- /** A serializable variant of HDFS's BlockLocation. */
- private case class SerializableBlockLocation(
- names: Array[String],
- hosts: Array[String],
- offset: Long,
- length: Long)
-
- /** A serializable variant of HDFS's FileStatus. */
- private case class SerializableFileStatus(
- path: String,
- length: Long,
- isDir: Boolean,
- blockReplication: Short,
- blockSize: Long,
- modificationTime: Long,
- accessTime: Long,
- blockLocations: Array[SerializableBlockLocation])
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index 8bf7504..6495463 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -163,7 +163,7 @@ object CommandUtils extends Logging {
.getConfString("hive.exec.stagingdir", ".hive-staging")
val filter = new PathFilterIgnoreNonData(stagingDir)
val sizes = InMemoryFileIndex.bulkListLeafFiles(paths.flatten,
- sparkSession.sessionState.newHadoopConf(), filter, sparkSession, isRootLevel = true).map {
+ sparkSession.sessionState.newHadoopConf(), filter, sparkSession).map {
case (_, files) => files.map(_.getLen).sum
}
// the size is 0 where paths(i) is not defined and sizes(i) where it is defined
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 130894e..2127595 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -128,7 +128,7 @@ class InMemoryFileIndex(
}
val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
val discovered = InMemoryFileIndex.bulkListLeafFiles(
- pathsToFetch.toSeq, hadoopConf, filter, sparkSession, isRootLevel = true)
+ pathsToFetch.toSeq, hadoopConf, filter, sparkSession)
discovered.foreach { case (path, leafFiles) =>
HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
fileStatusCache.putLeafFiles(path, leafFiles.toArray)
@@ -146,20 +146,17 @@ object InMemoryFileIndex extends Logging {
paths: Seq[Path],
hadoopConf: Configuration,
filter: PathFilter,
- sparkSession: SparkSession,
- isRootLevel: Boolean): Seq[(Path, Seq[FileStatus])] = {
+ sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
HadoopFSUtils.parallelListLeafFiles(
sc = sparkSession.sparkContext,
paths = paths,
hadoopConf = hadoopConf,
- filter = filter,
- isRootLevel = isRootLevel,
+ filter = new PathFilterWrapper(filter),
ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles,
ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality,
parallelismThreshold = sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold,
- parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism,
- filterFun = Some(shouldFilterOut))
- }
+ parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism)
+ }
/** Checks if we should filter out this path name. */
def shouldFilterOut(pathName: String): Boolean = {
@@ -175,3 +172,9 @@ object InMemoryFileIndex extends Logging {
exclude && !include
}
}
+
+private class PathFilterWrapper(val filter: PathFilter) extends PathFilter with Serializable {
+ override def accept(path: Path): Boolean = {
+ (filter == null || filter.accept(path)) && !InMemoryFileIndex.shouldFilterOut(path.getName)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org