You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2023/02/08 22:12:38 UTC

[spark] branch master updated: [SPARK-42379][SS] Use FileSystem.exists in FileSystemBasedCheckpointFileManager.exists

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a76bd9dcf96 [SPARK-42379][SS] Use FileSystem.exists in FileSystemBasedCheckpointFileManager.exists
a76bd9dcf96 is described below

commit a76bd9dcf961e0e8bfd6e14ae30a249667f04982
Author: Jungtaek Lim <ka...@gmail.com>
AuthorDate: Thu Feb 9 07:12:13 2023 +0900

    [SPARK-42379][SS] Use FileSystem.exists in FileSystemBasedCheckpointFileManager.exists
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to use FileSystem.exists in FileSystemBasedCheckpointFileManager.exists, which is consistent with other methods in FileSystemBasedCheckpointFileManager.
    
    This PR also removes the test case QueryExecutionErrorsSuite.FAILED_RENAME_PATH: rename when destination path already exists because the test relies on incorrect custom file system instance with non-symmetric implementation between `FileSystemBasedCheckpointFileManager.exists` vs `FileSystem.exists`.
    (See detailed explanation from https://github.com/apache/spark/pull/39936#issuecomment-1422101967)
    
    ### Why are the changes needed?
    
    Other methods in FileSystemBasedCheckpointFileManager already uses FileSystem.exists for all cases checking existence of the path.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #39936 from HeartSaVioR/MINOR-FileSystemBasedCheckpointFileManager-calls-fs-exists-in-exists.
    
    Authored-by: Jungtaek Lim <ka...@gmail.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../streaming/CheckpointFileManager.scala          |  7 +----
 .../sql/errors/QueryExecutionErrorsSuite.scala     | 35 ----------------------
 2 files changed, 1 insertion(+), 41 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index 013efd3c7ba..6df0a2f3063 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -256,12 +256,7 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
     fs.open(path)
   }
 
-  override def exists(path: Path): Boolean =
-    try {
-      fs.getFileStatus(path) != null
-    } catch {
-      case _: FileNotFoundException => false
-    }
+  override def exists(path: Path): Boolean = fs.exists(path)
 
   override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
     if (!overwriteIfPossible && fs.exists(dstPath)) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
index 89e0bf7fe41..90180d5e600 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
@@ -634,37 +634,6 @@ class QueryExecutionErrorsSuite
       sqlState = "0A000")
   }
 
-  test("FAILED_RENAME_PATH: rename when destination path already exists") {
-    withTempPath { p =>
-      withSQLConf(
-        "spark.sql.streaming.checkpointFileManagerClass" ->
-          classOf[FileSystemBasedCheckpointFileManager].getName,
-        "fs.file.impl" -> classOf[FakeFileSystemAlwaysExists].getName,
-        // FileSystem caching could cause a different implementation of fs.file to be used
-        "fs.file.impl.disable.cache" -> "true") {
-        val checkpointLocation = p.getAbsolutePath
-
-        val ds = spark.readStream.format("rate").load()
-        val e = intercept[SparkConcurrentModificationException] {
-          ds.writeStream
-            .option("checkpointLocation", checkpointLocation)
-            .queryName("_")
-            .format("memory")
-            .start()
-        }
-
-        val expectedPath = p.toURI
-        checkError(
-          exception = e.getCause.asInstanceOf[SparkFileAlreadyExistsException],
-          errorClass = "FAILED_RENAME_PATH",
-          sqlState = Some("42K04"),
-          matchPVals = true,
-          parameters = Map("sourcePath" -> s"$expectedPath.+",
-            "targetPath" -> s"$expectedPath.+"))
-      }
-    }
-  }
-
   test("RENAME_SRC_PATH_NOT_FOUND: rename the file which source path does not exist") {
     withTempPath { p =>
       withSQLConf(
@@ -805,10 +774,6 @@ class FakeFileSystemSetPermission extends LocalFileSystem {
   }
 }
 
-class FakeFileSystemAlwaysExists extends DebugFilesystem {
-  override def exists(f: Path): Boolean = true
-}
-
 class FakeFileSystemNeverExists extends DebugFilesystem {
   override def exists(f: Path): Boolean = false
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org