You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/05/24 05:21:10 UTC

spark git commit: [SPARK-24364][SS] Prevent InMemoryFileIndex from failing if file path doesn't exist

Repository: spark
Updated Branches:
  refs/heads/master e108f84f5 -> 8a545822d


[SPARK-24364][SS] Prevent InMemoryFileIndex from failing if file path doesn't exist

## What changes were proposed in this pull request?

This PR proposes to follow up https://github.com/apache/spark/pull/15153 and complete SPARK-17599.

`FileSystem` operation (`fs.getFileBlockLocations`) can still fail if the file path does not exist. For example see the exception message below:

```
Error occurred while processing: File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv
...
java.io.FileNotFoundException: File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv
...
org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:249)
	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:229)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles$3.apply(InMemoryFileIndex.scala:314)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles$3.apply(InMemoryFileIndex.scala:297)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles(InMemoryFileIndex.scala:297)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:174)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:173)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles(InMemoryFileIndex.scala:173)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:126)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:91)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:67)
	at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$lzycompute$1(DataSource.scala:161)
	at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$tempFileIndex$1(DataSource.scala:152)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:166)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:261)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:94)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:94)
	at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33)
	at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:196)
	at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:206)
	at com.hwx.StreamTest$.main(StreamTest.scala:97)
	at com.hwx.StreamTest.main(StreamTest.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:906)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv
...
```

So, it fixes it to make a warning instead.

## How was this patch tested?

It's hard to write a test. Manually tested multiple times.

Author: hyukjinkwon <gu...@apache.org>

Closes #21408 from HyukjinKwon/missing-files.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a545822
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a545822
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a545822

Branch: refs/heads/master
Commit: 8a545822d0cc3a866ef91a94e58ea5c8b1014007
Parents: e108f84
Author: hyukjinkwon <gu...@apache.org>
Authored: Thu May 24 13:21:02 2018 +0800
Committer: hyukjinkwon <gu...@apache.org>
Committed: Thu May 24 13:21:02 2018 +0800

----------------------------------------------------------------------
 .../datasources/InMemoryFileIndex.scala         | 32 +++++++++++++++-----
 1 file changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8a545822/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 739d1f4..9d9f8bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -294,9 +294,12 @@ object InMemoryFileIndex extends Logging {
       if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
     }
 
-    allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
+    val missingFiles = mutable.ArrayBuffer.empty[String]
+    val filteredLeafStatuses = allLeafStatuses.filterNot(
+      status => shouldFilterOut(status.getPath.getName))
+    val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
       case f: LocatedFileStatus =>
-        f
+        Some(f)
 
       // NOTE:
       //
@@ -311,14 +314,27 @@ object InMemoryFileIndex extends Logging {
         // The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
         // which is very slow on some file system (RawLocalFileSystem, which is launch a
         // subprocess and parse the stdout).
-        val locations = fs.getFileBlockLocations(f, 0, f.getLen)
-        val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
-          f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
-        if (f.isSymlink) {
-          lfs.setSymlink(f.getSymlink)
+        try {
+          val locations = fs.getFileBlockLocations(f, 0, f.getLen)
+          val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
+            f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
+          if (f.isSymlink) {
+            lfs.setSymlink(f.getSymlink)
+          }
+          Some(lfs)
+        } catch {
+          case _: FileNotFoundException =>
+            missingFiles += f.getPath.toString
+            None
         }
-        lfs
     }
+
+    if (missingFiles.nonEmpty) {
+      logWarning(
+        s"the following files were missing during file scan:\n  ${missingFiles.mkString("\n  ")}")
+    }
+
+    resolvedLeafStatuses
   }
 
   /** Checks if we should filter out this path name. */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org