You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/12/09 23:22:24 UTC
(spark) branch branch-3.4 updated: [SPARK-46339][SS] Directory with batch number name should not be treated as metadata log
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 4e80b3a09407 [SPARK-46339][SS] Directory with batch number name should not be treated as metadata log
4e80b3a09407 is described below
commit 4e80b3a09407042f7c596963dcb4fc59e68755ab
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Sat Dec 9 15:20:55 2023 -0800
[SPARK-46339][SS] Directory with batch number name should not be treated as metadata log
### What changes were proposed in this pull request?
This patch updates the document of `CheckpointFileManager.list` method to reflect the fact it is used to return both files and directories to reduce confusion.
For the usage like `HDFSMetadataLog` where it assumes returned file status by `list` are all files, we add a filter there to avoid confusing error.
### Why are the changes needed?
`HDFSMetadataLog` takes a metadata path as parameter. When it goes to retrieves all batches metadata, it calls `CheckpointFileManager.list` to get all files under the metadata path. However, currently all implementations of `CheckpointFileManager.list` returns all files/directories under the given path. So if there is a dictionary with name of batch number (a long value), the directory will be returned too and cause trouble when `HDFSMetadataLog` goes to read it.
Actually, `CheckpointFileManager.list` method clearly defines that it lists the "files" in a path. That's being said, current implementations don't follow the doc. We tried to make `list` method implementations only return files but some usage (state metadata) of `list` method already break the assumption and they use dictionaries returned by `list` method. So we simply update `list` method document to explicitly define it returns both files/dictionaries. We add a filter in `HDFSMetad [...]
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #44272 from viirya/fix_metadatalog.
Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
(cherry picked from commit 75805f07f5caeb01104a7352b02790d03a043ded)
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
(cherry picked from commit 28a8b181e96d4ce71e2f9888910214d14a859b7d)
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../sql/execution/streaming/CheckpointFileManager.scala | 4 ++--
.../spark/sql/execution/streaming/HDFSMetadataLog.scala | 2 ++
.../spark/sql/execution/streaming/HDFSMetadataLogSuite.scala | 12 ++++++++++++
3 files changed, 16 insertions(+), 2 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index 013efd3c7bae..b2a3b8d73d4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -65,10 +65,10 @@ trait CheckpointFileManager {
/** Open a file for reading, or throw exception if it does not exist. */
def open(path: Path): FSDataInputStream
- /** List the files in a path that match a filter. */
+ /** List the files/directories in a path that match a filter. */
def list(path: Path, filter: PathFilter): Array[FileStatus]
- /** List all the files in a path. */
+ /** List all the files/directories in a path. */
def list(path: Path): Array[FileStatus] = {
list(path, (_: Path) => true)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 2b0172bb9555..9a811db679d0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -325,6 +325,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
/** List the available batches on file system. */
protected def listBatches: Array[Long] = {
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+ // Batches must be files
+ .filter(f => f.isFile)
.map(f => pathToBatchId(f.getPath)) ++
// Iterate over keySet is not thread safe. We call `toArray` to make a copy in the lock to
// elimiate the race condition.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 980d532dd477..08f245135f58 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -33,6 +33,18 @@ class HDFSMetadataLogSuite extends SharedSparkSession {
private implicit def toOption[A](a: A): Option[A] = Option(a)
+ test("SPARK-46339: Directory with number name should not be treated as metadata log") {
+ withTempDir { temp =>
+ val dir = new File(temp, "dir")
+ val metadataLog = new HDFSMetadataLog[String](spark, dir.getAbsolutePath)
+ assert(metadataLog.metadataPath.toString.endsWith("/dir"))
+
+ // Create a directory with batch id 0
+ new File(dir, "0").mkdir()
+ assert(metadataLog.getLatest() === None)
+ }
+ }
+
test("HDFSMetadataLog: basic") {
withTempDir { temp =>
val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org