You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2023/06/16 20:29:20 UTC
[spark] branch master updated: [SPARK-44081] Simplify PartitionedFileUtil API a little
This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8754e9a6e83 [SPARK-44081] Simplify PartitionedFileUtil API a little
8754e9a6e83 is described below
commit 8754e9a6e839ff4ca5f91b814861ba869a8c647b
Author: Ryan Johnson <ry...@databricks.com>
AuthorDate: Fri Jun 16 13:29:09 2023 -0700
[SPARK-44081] Simplify PartitionedFileUtil API a little
### What changes were proposed in this pull request?
Utility methods in `PartitionedFileUtil` take both `file: FileStatusWithMetadata` and `filePath: Path`, even tho the latter can be obtained easily from the former.
This was originally done for performance reasons, so callers can pass a memoized `Path` and thus avoid the cost of an additional `FileStatus.getPath` call.
Now that we have `FileStatusWithMetadata`, the redundancy is no longer needed -- the new class can simply capture the path as a lazy val.
### Why are the changes needed?
Simpler code (and eliminates risk of passing a mismatched path).
### Does this PR introduce _any_ user-facing change?
No. Internal API change.
### How was this patch tested?
Small refactor, existing unit tests suffice.
Closes #41632 from ryan-johnson-databricks/partitioned-file-paths.
Authored-by: Ryan Johnson <ry...@databricks.com>
Signed-off-by: Gengliang Wang <ge...@apache.org>
---
.../org/apache/spark/sql/execution/DataSourceScanExec.scala | 12 +++---------
.../org/apache/spark/sql/execution/PartitionedFileUtil.scala | 10 ++++------
.../apache/spark/sql/execution/datasources/FileIndex.scala | 3 ++-
.../apache/spark/sql/execution/datasources/v2/FileScan.scala | 4 +---
4 files changed, 10 insertions(+), 19 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 0d5091f4a97..243aaabc0cb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -624,9 +624,7 @@ case class FileSourceScanExec(
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
val filesGroupedToBuckets =
selectedPartitions.flatMap { p =>
- p.files.map { f =>
- PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values)
- }
+ p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, p.values))
}.groupBy { f =>
BucketingUtils
.getBucketId(f.toPath.getName)
@@ -691,12 +689,9 @@ case class FileSourceScanExec(
val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
- // getPath() is very expensive so we only want to call it once in this block:
- val filePath = file.getPath
-
- if (shouldProcess(filePath)) {
+ if (shouldProcess(file.getPath)) {
val isSplitable = relation.fileFormat.isSplitable(
- relation.sparkSession, relation.options, filePath) &&
+ relation.sparkSession, relation.options, file.getPath) &&
// SPARK-39634: Allow file splitting in combination with row index generation once
// the fix for PARQUET-2161 is available.
(!relation.fileFormat.isInstanceOf[ParquetSource]
@@ -704,7 +699,6 @@ case class FileSourceScanExec(
PartitionedFileUtil.splitFiles(
sparkSession = relation.sparkSession,
file = file,
- filePath = filePath,
isSplitable = isSplitable,
maxSplitBytes = maxSplitBytes,
partitionValues = partition.values
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala
index dcc2114b54d..cc234565d11 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution
-import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
+import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus}
import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.SparkSession
@@ -28,7 +28,6 @@ object PartitionedFileUtil {
def splitFiles(
sparkSession: SparkSession,
file: FileStatusWithMetadata,
- filePath: Path,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
@@ -37,20 +36,19 @@ object PartitionedFileUtil {
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(getBlockLocations(file.fileStatus), offset, size)
- PartitionedFile(partitionValues, SparkPath.fromPath(filePath), offset, size, hosts,
+ PartitionedFile(partitionValues, SparkPath.fromPath(file.getPath), offset, size, hosts,
file.getModificationTime, file.getLen, file.metadata)
}
} else {
- Seq(getPartitionedFile(file, filePath, partitionValues))
+ Seq(getPartitionedFile(file, partitionValues))
}
}
def getPartitionedFile(
file: FileStatusWithMetadata,
- filePath: Path,
partitionValues: InternalRow): PartitionedFile = {
val hosts = getBlockHosts(getBlockLocations(file.fileStatus), 0, file.getLen)
- PartitionedFile(partitionValues, SparkPath.fromPath(filePath), 0, file.getLen, hosts,
+ PartitionedFile(partitionValues, SparkPath.fromPath(file.getPath), 0, file.getLen, hosts,
file.getModificationTime, file.getLen, file.metadata)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
index 5331f29bc5b..1b28294e94a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
@@ -30,7 +30,8 @@ import org.apache.spark.sql.types.StructType
*/
case class FileStatusWithMetadata(fileStatus: FileStatus, metadata: Map[String, Any] = Map.empty) {
// Wrapper methods to improve source compatibility in code that still expects a [[FileStatus]].
- def getPath: Path = fileStatus.getPath
+ // NOTE: getPath() is very expensive, so we only want to call it once (if accessed at all).
+ lazy val getPath: Path = fileStatus.getPath
def getLen: Long = fileStatus.getLen
def getModificationTime: Long = fileStatus.getModificationTime
def isDirectory: Boolean = fileStatus.isDirectory
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
index 0cfb55ab407..a52ea901f02 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
@@ -150,12 +150,10 @@ trait FileScan extends Scan
partition.values
}
partition.files.flatMap { file =>
- val filePath = file.getPath
PartitionedFileUtil.splitFiles(
sparkSession = sparkSession,
file = file,
- filePath = filePath,
- isSplitable = isSplitable(filePath),
+ isSplitable = isSplitable(file.getPath),
maxSplitBytes = maxSplitBytes,
partitionValues = partitionValues
)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org