You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/03/17 21:24:19 UTC
[spark] branch master updated: [SPARK-31047][SQL] Improve file
listing for ViewFileSystem
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4e4e08f [SPARK-31047][SQL] Improve file listing for ViewFileSystem
4e4e08f is described below
commit 4e4e08f372db888797fd23faca88ac02d9466d5a
Author: manuzhang <ow...@gmail.com>
AuthorDate: Tue Mar 17 14:23:28 2020 -0700
[SPARK-31047][SQL] Improve file listing for ViewFileSystem
### What changes were proposed in this pull request?
Use `listLocatedStatus` when `lnMemoryFileIndex` is listing files from a `ViewFileSystem` which should delegate to that of `DistributedFileSystem`.
### Why are the changes needed?
When `ViewFileSystem` is used to manage several `DistributedFileSystem`, the change will improve performance of file listing, especially when there are many files.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes #27801 from manuzhang/spark-31047.
Authored-by: manuzhang <ow...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../execution/datasources/InMemoryFileIndex.scala | 3 ++-
.../sql/execution/datasources/FileIndexSuite.scala | 25 +++++++++++++++++++++-
2 files changed, 26 insertions(+), 2 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index cac2d6e..84160f3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.viewfs.ViewFileSystem
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
@@ -313,7 +314,7 @@ object InMemoryFileIndex extends Logging {
// to retrieve the file status with the file block location. The reason to still fallback
// to listStatus is because the default implementation would potentially throw a
// FileNotFoundException which is better handled by doing the lookups manually below.
- case _: DistributedFileSystem if !ignoreLocality =>
+ case (_: DistributedFileSystem | _: ViewFileSystem) if !ignoreLocality =>
val remoteIter = fs.listLocatedStatus(path)
new Iterator[LocatedFileStatus]() {
def next(): LocatedFileStatus = remoteIter.next
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 553773e..ea15f18 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -22,7 +22,11 @@ import java.net.URI
import scala.collection.mutable
-import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path, RawLocalFileSystem}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path, RawLocalFileSystem, RemoteIterator}
+import org.apache.hadoop.fs.viewfs.ViewFileSystem
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{mock, when}
import org.apache.spark.SparkException
import org.apache.spark.metrics.source.HiveCatalogMetrics
@@ -465,6 +469,25 @@ class FileIndexSuite extends SharedSparkSession {
}
}
}
+
+ test("SPARK-31047 - Improve file listing for ViewFileSystem") {
+ val path = mock(classOf[Path])
+ val dfs = mock(classOf[ViewFileSystem])
+ when(path.getFileSystem(any[Configuration])).thenReturn(dfs)
+ val statuses =
+ Seq(
+ new LocatedFileStatus(
+ new FileStatus(0, false, 0, 100, 0,
+ new Path("file")), Array(new BlockLocation()))
+ )
+ when(dfs.listLocatedStatus(path)).thenReturn(new RemoteIterator[LocatedFileStatus] {
+ val iter = statuses.toIterator
+ override def hasNext: Boolean = iter.hasNext
+ override def next(): LocatedFileStatus = iter.next
+ })
+ val fileIndex = new TestInMemoryFileIndex(spark, path)
+ assert(fileIndex.leafFileStatuses.toSeq == statuses)
+ }
}
object DeletionRaceFileSystem {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org