You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/12/09 23:22:24 UTC

(spark) branch branch-3.4 updated: [SPARK-46339][SS] Directory with batch number name should not be treated as metadata log

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 4e80b3a09407 [SPARK-46339][SS] Directory with batch number name should not be treated as metadata log
4e80b3a09407 is described below

commit 4e80b3a09407042f7c596963dcb4fc59e68755ab
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Sat Dec 9 15:20:55 2023 -0800

    [SPARK-46339][SS] Directory with batch number name should not be treated as metadata log
    
    ### What changes were proposed in this pull request?
    
    This patch updates the document of `CheckpointFileManager.list` method to reflect the fact it is used to return both files and directories to reduce confusion.
    
    For the usage like `HDFSMetadataLog` where it assumes returned file status by `list` are all files, we add a filter there to avoid confusing error.
    
    ### Why are the changes needed?
    
    `HDFSMetadataLog` takes a metadata path as parameter. When it goes to retrieves all batches metadata, it calls `CheckpointFileManager.list` to get all files under the metadata path. However, currently all implementations of `CheckpointFileManager.list` returns all files/directories under the given path. So if there is a dictionary with name of batch number (a long value), the directory will be returned too and cause trouble when `HDFSMetadataLog` goes to read it.
    
    Actually, `CheckpointFileManager.list` method clearly defines that it lists the "files" in a path. That's being said, current implementations don't follow the doc. We tried to make `list` method implementations only return files but some usage (state metadata) of `list` method already break the assumption and they use dictionaries returned by `list` method. So we simply update `list` method document to explicitly define it returns both files/dictionaries. We add a filter in `HDFSMetad [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #44272 from viirya/fix_metadatalog.
    
    Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 75805f07f5caeb01104a7352b02790d03a043ded)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 28a8b181e96d4ce71e2f9888910214d14a859b7d)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../sql/execution/streaming/CheckpointFileManager.scala      |  4 ++--
 .../spark/sql/execution/streaming/HDFSMetadataLog.scala      |  2 ++
 .../spark/sql/execution/streaming/HDFSMetadataLogSuite.scala | 12 ++++++++++++
 3 files changed, 16 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index 013efd3c7bae..b2a3b8d73d4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -65,10 +65,10 @@ trait CheckpointFileManager {
   /** Open a file for reading, or throw exception if it does not exist. */
   def open(path: Path): FSDataInputStream
 
-  /** List the files in a path that match a filter. */
+  /** List the files/directories in a path that match a filter. */
   def list(path: Path, filter: PathFilter): Array[FileStatus]
 
-  /** List all the files in a path. */
+  /** List all the files/directories in a path. */
   def list(path: Path): Array[FileStatus] = {
     list(path, (_: Path) => true)
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 2b0172bb9555..9a811db679d0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -325,6 +325,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
   /** List the available batches on file system. */
   protected def listBatches: Array[Long] = {
     val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+      // Batches must be files
+      .filter(f => f.isFile)
       .map(f => pathToBatchId(f.getPath)) ++
       // Iterate over keySet is not thread safe. We call `toArray` to make a copy in the lock to
       // elimiate the race condition.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 980d532dd477..08f245135f58 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -33,6 +33,18 @@ class HDFSMetadataLogSuite extends SharedSparkSession {
 
   private implicit def toOption[A](a: A): Option[A] = Option(a)
 
+  test("SPARK-46339: Directory with number name should not be treated as metadata log") {
+    withTempDir { temp =>
+      val dir = new File(temp, "dir")
+      val metadataLog = new HDFSMetadataLog[String](spark, dir.getAbsolutePath)
+      assert(metadataLog.metadataPath.toString.endsWith("/dir"))
+
+      // Create a directory with batch id 0
+      new File(dir, "0").mkdir()
+      assert(metadataLog.getLatest() === None)
+    }
+  }
+
   test("HDFSMetadataLog: basic") {
     withTempDir { temp =>
       val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org