You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2023/06/16 20:29:20 UTC

[spark] branch master updated: [SPARK-44081] Simplify PartitionedFileUtil API a little

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8754e9a6e83 [SPARK-44081] Simplify PartitionedFileUtil API a little
8754e9a6e83 is described below

commit 8754e9a6e839ff4ca5f91b814861ba869a8c647b
Author: Ryan Johnson <ry...@databricks.com>
AuthorDate: Fri Jun 16 13:29:09 2023 -0700

    [SPARK-44081] Simplify PartitionedFileUtil API a little
    
    ### What changes were proposed in this pull request?
    
    Utility methods in `PartitionedFileUtil` take both `file: FileStatusWithMetadata` and `filePath: Path`, even tho the latter can be obtained easily from the former.
    
    This was originally done for performance reasons, so callers can pass a memoized `Path` and thus avoid the cost of an additional `FileStatus.getPath` call.
    
    Now that we have `FileStatusWithMetadata`, the redundancy is no longer needed -- the new class can simply capture the path as a lazy val.
    
    ### Why are the changes needed?
    
    Simpler code (and eliminates risk of passing a mismatched path).
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. Internal API change.
    
    ### How was this patch tested?
    
    Small refactor, existing unit tests suffice.
    
    Closes #41632 from ryan-johnson-databricks/partitioned-file-paths.
    
    Authored-by: Ryan Johnson <ry...@databricks.com>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 .../org/apache/spark/sql/execution/DataSourceScanExec.scala  | 12 +++---------
 .../org/apache/spark/sql/execution/PartitionedFileUtil.scala | 10 ++++------
 .../apache/spark/sql/execution/datasources/FileIndex.scala   |  3 ++-
 .../apache/spark/sql/execution/datasources/v2/FileScan.scala |  4 +---
 4 files changed, 10 insertions(+), 19 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 0d5091f4a97..243aaabc0cb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -624,9 +624,7 @@ case class FileSourceScanExec(
     logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
     val filesGroupedToBuckets =
       selectedPartitions.flatMap { p =>
-        p.files.map { f =>
-          PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values)
-        }
+        p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, p.values))
       }.groupBy { f =>
         BucketingUtils
           .getBucketId(f.toPath.getName)
@@ -691,12 +689,9 @@ case class FileSourceScanExec(
 
     val splitFiles = selectedPartitions.flatMap { partition =>
       partition.files.flatMap { file =>
-        // getPath() is very expensive so we only want to call it once in this block:
-        val filePath = file.getPath
-
-        if (shouldProcess(filePath)) {
+        if (shouldProcess(file.getPath)) {
           val isSplitable = relation.fileFormat.isSplitable(
-              relation.sparkSession, relation.options, filePath) &&
+              relation.sparkSession, relation.options, file.getPath) &&
             // SPARK-39634: Allow file splitting in combination with row index generation once
             // the fix for PARQUET-2161 is available.
             (!relation.fileFormat.isInstanceOf[ParquetSource]
@@ -704,7 +699,6 @@ case class FileSourceScanExec(
           PartitionedFileUtil.splitFiles(
             sparkSession = relation.sparkSession,
             file = file,
-            filePath = filePath,
             isSplitable = isSplitable,
             maxSplitBytes = maxSplitBytes,
             partitionValues = partition.values
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala
index dcc2114b54d..cc234565d11 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
+import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus}
 
 import org.apache.spark.paths.SparkPath
 import org.apache.spark.sql.SparkSession
@@ -28,7 +28,6 @@ object PartitionedFileUtil {
   def splitFiles(
       sparkSession: SparkSession,
       file: FileStatusWithMetadata,
-      filePath: Path,
       isSplitable: Boolean,
       maxSplitBytes: Long,
       partitionValues: InternalRow): Seq[PartitionedFile] = {
@@ -37,20 +36,19 @@ object PartitionedFileUtil {
         val remaining = file.getLen - offset
         val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
         val hosts = getBlockHosts(getBlockLocations(file.fileStatus), offset, size)
-        PartitionedFile(partitionValues, SparkPath.fromPath(filePath), offset, size, hosts,
+        PartitionedFile(partitionValues, SparkPath.fromPath(file.getPath), offset, size, hosts,
           file.getModificationTime, file.getLen, file.metadata)
       }
     } else {
-      Seq(getPartitionedFile(file, filePath, partitionValues))
+      Seq(getPartitionedFile(file, partitionValues))
     }
   }
 
   def getPartitionedFile(
       file: FileStatusWithMetadata,
-      filePath: Path,
       partitionValues: InternalRow): PartitionedFile = {
     val hosts = getBlockHosts(getBlockLocations(file.fileStatus), 0, file.getLen)
-    PartitionedFile(partitionValues, SparkPath.fromPath(filePath), 0, file.getLen, hosts,
+    PartitionedFile(partitionValues, SparkPath.fromPath(file.getPath), 0, file.getLen, hosts,
       file.getModificationTime, file.getLen, file.metadata)
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
index 5331f29bc5b..1b28294e94a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
@@ -30,7 +30,8 @@ import org.apache.spark.sql.types.StructType
  */
 case class FileStatusWithMetadata(fileStatus: FileStatus, metadata: Map[String, Any] = Map.empty) {
   // Wrapper methods to improve source compatibility in code that still expects a [[FileStatus]].
-  def getPath: Path = fileStatus.getPath
+  // NOTE: getPath() is very expensive, so we only want to call it once (if accessed at all).
+  lazy val getPath: Path = fileStatus.getPath
   def getLen: Long = fileStatus.getLen
   def getModificationTime: Long = fileStatus.getModificationTime
   def isDirectory: Boolean = fileStatus.isDirectory
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
index 0cfb55ab407..a52ea901f02 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
@@ -150,12 +150,10 @@ trait FileScan extends Scan
         partition.values
       }
       partition.files.flatMap { file =>
-        val filePath = file.getPath
         PartitionedFileUtil.splitFiles(
           sparkSession = sparkSession,
           file = file,
-          filePath = filePath,
-          isSplitable = isSplitable(filePath),
+          isSplitable = isSplitable(file.getPath),
           maxSplitBytes = maxSplitBytes,
           partitionValues = partitionValues
         )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org