You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/09/21 09:07:24 UTC

spark git commit: [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist

Repository: spark
Updated Branches:
  refs/heads/master 3977223a3 -> 28fafa3ee


[SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist

## What changes were proposed in this pull request?

The `ListingFileCatalog` lists files given a set of resolved paths. If a folder is deleted at any time between the paths were resolved and the file catalog can check for the folder, the Spark job fails. This may abruptly stop long running StructuredStreaming jobs for example.

Folders may be deleted by users or automatically by retention policies. These cases should not prevent jobs from successfully completing.

## How was this patch tested?

Unit test in `FileCatalogSuite`

Author: Burak Yavuz <br...@gmail.com>

Closes #15153 from brkyvz/SPARK-17599.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28fafa3e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28fafa3e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28fafa3e

Branch: refs/heads/master
Commit: 28fafa3ee8f3478fa441e7bd6c8fd4ab482ca98e
Parents: 3977223
Author: Burak Yavuz <br...@gmail.com>
Authored: Wed Sep 21 17:07:16 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Sep 21 17:07:16 2016 +0800

----------------------------------------------------------------------
 .../sql/execution/datasources/ListingFileCatalog.scala  | 12 ++++++++++--
 .../sql/execution/datasources/FileCatalogSuite.scala    | 11 +++++++++++
 2 files changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/28fafa3e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index 60742bd..3253208 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.io.FileNotFoundException
+
 import scala.collection.mutable
 
 import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
@@ -97,8 +99,14 @@ class ListingFileCatalog(
         logTrace(s"Listing $path on driver")
 
         val childStatuses = {
-          val stats = fs.listStatus(path)
-          if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
+          try {
+            val stats = fs.listStatus(path)
+            if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
+          } catch {
+            case _: FileNotFoundException =>
+              logWarning(s"The directory $path was not found. Was it deleted very recently?")
+              Array.empty[FileStatus]
+          }
         }
 
         childStatuses.map {

http://git-wip-us.apache.org/repos/asf/spark/blob/28fafa3e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
index 0d9ea51..5c8d322 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
@@ -67,4 +67,15 @@ class FileCatalogSuite extends SharedSQLContext {
 
     }
   }
+
+  test("ListingFileCatalog: folders that don't exist don't throw exceptions") {
+    withTempDir { dir =>
+      val deletedFolder = new File(dir, "deleted")
+      assert(!deletedFolder.exists())
+      val catalog1 = new ListingFileCatalog(
+        spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None)
+      // doesn't throw an exception
+      assert(catalog1.listLeafFiles(catalog1.paths).isEmpty)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org