You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/03/17 21:24:19 UTC

[spark] branch master updated: [SPARK-31047][SQL] Improve file listing for ViewFileSystem

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e4e08f  [SPARK-31047][SQL] Improve file listing for ViewFileSystem
4e4e08f is described below

commit 4e4e08f372db888797fd23faca88ac02d9466d5a
Author: manuzhang <ow...@gmail.com>
AuthorDate: Tue Mar 17 14:23:28 2020 -0700

    [SPARK-31047][SQL] Improve file listing for ViewFileSystem
    
    ### What changes were proposed in this pull request?
    Use `listLocatedStatus` when `lnMemoryFileIndex` is listing files from a `ViewFileSystem` which should delegate to that of `DistributedFileSystem`.
    
    ### Why are the changes needed?
    When `ViewFileSystem` is used to manage several `DistributedFileSystem`, the change will improve performance of file listing, especially when there are many files.
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Existing tests.
    
    Closes #27801 from manuzhang/spark-31047.
    
    Authored-by: manuzhang <ow...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../execution/datasources/InMemoryFileIndex.scala  |  3 ++-
 .../sql/execution/datasources/FileIndexSuite.scala | 25 +++++++++++++++++++++-
 2 files changed, 26 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index cac2d6e..84160f3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.viewfs.ViewFileSystem
 import org.apache.hadoop.hdfs.DistributedFileSystem
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
@@ -313,7 +314,7 @@ object InMemoryFileIndex extends Logging {
         // to retrieve the file status with the file block location. The reason to still fallback
         // to listStatus is because the default implementation would potentially throw a
         // FileNotFoundException which is better handled by doing the lookups manually below.
-        case _: DistributedFileSystem if !ignoreLocality =>
+        case (_: DistributedFileSystem | _: ViewFileSystem) if !ignoreLocality =>
           val remoteIter = fs.listLocatedStatus(path)
           new Iterator[LocatedFileStatus]() {
             def next(): LocatedFileStatus = remoteIter.next
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 553773e..ea15f18 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -22,7 +22,11 @@ import java.net.URI
 
 import scala.collection.mutable
 
-import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path, RawLocalFileSystem}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path, RawLocalFileSystem, RemoteIterator}
+import org.apache.hadoop.fs.viewfs.ViewFileSystem
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{mock, when}
 
 import org.apache.spark.SparkException
 import org.apache.spark.metrics.source.HiveCatalogMetrics
@@ -465,6 +469,25 @@ class FileIndexSuite extends SharedSparkSession {
       }
     }
   }
+
+  test("SPARK-31047 - Improve file listing for ViewFileSystem") {
+    val path = mock(classOf[Path])
+    val dfs = mock(classOf[ViewFileSystem])
+    when(path.getFileSystem(any[Configuration])).thenReturn(dfs)
+    val statuses =
+      Seq(
+        new LocatedFileStatus(
+          new FileStatus(0, false, 0, 100, 0,
+            new Path("file")), Array(new BlockLocation()))
+      )
+    when(dfs.listLocatedStatus(path)).thenReturn(new RemoteIterator[LocatedFileStatus] {
+      val iter = statuses.toIterator
+      override def hasNext: Boolean = iter.hasNext
+      override def next(): LocatedFileStatus = iter.next
+    })
+    val fileIndex = new TestInMemoryFileIndex(spark, path)
+    assert(fileIndex.leafFileStatuses.toSeq == statuses)
+  }
 }
 
 object DeletionRaceFileSystem {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org