You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/12/02 12:59:53 UTC

spark git commit: [SPARK-18679][SQL] Fix regression in file listing performance for non-catalog tables

Repository: spark
Updated Branches:
  refs/heads/master 2159bf8b2 -> 294163ee9


[SPARK-18679][SQL] Fix regression in file listing performance for non-catalog tables

## What changes were proposed in this pull request?

In Spark 2.1 ListingFileCatalog was significantly refactored (and renamed to InMemoryFileIndex). This introduced a regression where parallelism could only be introduced at the very top of the tree. However, in many cases (e.g. `spark.read.parquet(topLevelDir)`), the top of the tree is only a single directory.

This PR simplifies and fixes the parallel recursive listing code to allow parallelism to be introduced at any level during recursive descent (though note that once we decide to list a sub-tree in parallel, the sub-tree is listed in serial on executors).

cc mallman  cloud-fan

## How was this patch tested?

Checked metrics in unit tests.

Author: Eric Liang <ek...@databricks.com>

Closes #16112 from ericl/spark-18679.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/294163ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/294163ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/294163ee

Branch: refs/heads/master
Commit: 294163ee9319e4f7f6da1259839eb3c80bba25c2
Parents: 2159bf8
Author: Eric Liang <ek...@databricks.com>
Authored: Fri Dec 2 20:59:39 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Dec 2 20:59:39 2016 +0800

----------------------------------------------------------------------
 .../spark/metrics/source/StaticSources.scala    |   8 +
 .../PartitioningAwareFileIndex.scala            |  79 ++++----
 .../datasources/FileCatalogSuite.scala          | 135 -------------
 .../execution/datasources/FileIndexSuite.scala  | 188 +++++++++++++++++++
 4 files changed, 241 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/294163ee/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
index b433cd0..99ec786 100644
--- a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
@@ -91,6 +91,12 @@ object HiveCatalogMetrics extends Source {
   val METRIC_HIVE_CLIENT_CALLS = metricRegistry.counter(MetricRegistry.name("hiveClientCalls"))
 
   /**
+   * Tracks the total number of Spark jobs launched for parallel file listing.
+   */
+  val METRIC_PARALLEL_LISTING_JOB_COUNT = metricRegistry.counter(
+    MetricRegistry.name("parallelListingJobCount"))
+
+  /**
    * Resets the values of all metrics to zero. This is useful in tests.
    */
   def reset(): Unit = {
@@ -98,6 +104,7 @@ object HiveCatalogMetrics extends Source {
     METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
     METRIC_FILE_CACHE_HITS.dec(METRIC_FILE_CACHE_HITS.getCount())
     METRIC_HIVE_CLIENT_CALLS.dec(METRIC_HIVE_CLIENT_CALLS.getCount())
+    METRIC_PARALLEL_LISTING_JOB_COUNT.dec(METRIC_PARALLEL_LISTING_JOB_COUNT.getCount())
   }
 
   // clients can use these to avoid classloader issues with the codahale classes
@@ -105,4 +112,5 @@ object HiveCatalogMetrics extends Source {
   def incrementFilesDiscovered(n: Int): Unit = METRIC_FILES_DISCOVERED.inc(n)
   def incrementFileCacheHits(n: Int): Unit = METRIC_FILE_CACHE_HITS.inc(n)
   def incrementHiveClientCalls(n: Int): Unit = METRIC_HIVE_CLIENT_CALLS.inc(n)
+  def incrementParallelListingJobCount(n: Int): Unit = METRIC_PARALLEL_LISTING_JOB_COUNT.inc(n)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/294163ee/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 705a1e3..825a0f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -249,12 +249,9 @@ abstract class PartitioningAwareFileIndex(
           pathsToFetch += path
       }
     }
-    val discovered = if (pathsToFetch.length >=
-        sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
-      PartitioningAwareFileIndex.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession)
-    } else {
-      PartitioningAwareFileIndex.listLeafFilesInSerial(pathsToFetch, hadoopConf)
-    }
+    val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
+    val discovered = PartitioningAwareFileIndex.bulkListLeafFiles(
+      pathsToFetch, hadoopConf, filter, sparkSession)
     discovered.foreach { case (path, leafFiles) =>
       HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
       fileStatusCache.putLeafFiles(path, leafFiles.toArray)
@@ -286,31 +283,28 @@ object PartitioningAwareFileIndex extends Logging {
       blockLocations: Array[SerializableBlockLocation])
 
   /**
-   * List a collection of path recursively.
-   */
-  private def listLeafFilesInSerial(
-      paths: Seq[Path],
-      hadoopConf: Configuration): Seq[(Path, Seq[FileStatus])] = {
-    // Dummy jobconf to get to the pathFilter defined in configuration
-    val jobConf = new JobConf(hadoopConf, this.getClass)
-    val filter = FileInputFormat.getInputPathFilter(jobConf)
-
-    paths.map { path =>
-      val fs = path.getFileSystem(hadoopConf)
-      (path, listLeafFiles0(fs, path, filter))
-    }
-  }
-
-  /**
-   * List a collection of path recursively in parallel (using Spark executors).
-   * Each task launched will use [[listLeafFilesInSerial]] to list.
+   * Lists a collection of paths recursively. Picks the listing strategy adaptively depending
+   * on the number of paths to list.
+   *
+   * This may only be called on the driver.
+   *
+   * @return for each input path, the set of discovered files for the path
    */
-  private def listLeafFilesInParallel(
+  private def bulkListLeafFiles(
       paths: Seq[Path],
       hadoopConf: Configuration,
+      filter: PathFilter,
       sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
-    assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
+
+    // Short-circuits parallel listing when serial listing is likely to be faster.
+    if (paths.size < sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+      return paths.map { path =>
+        (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
+      }
+    }
+
     logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
+    HiveCatalogMetrics.incrementParallelListingJobCount(1)
 
     val sparkContext = sparkSession.sparkContext
     val serializableConfiguration = new SerializableConfiguration(hadoopConf)
@@ -324,9 +318,11 @@ object PartitioningAwareFileIndex extends Logging {
 
     val statusMap = sparkContext
       .parallelize(serializedPaths, numParallelism)
-      .mapPartitions { paths =>
+      .mapPartitions { pathStrings =>
         val hadoopConf = serializableConfiguration.value
-        listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
+        pathStrings.map(new Path(_)).toSeq.map { path =>
+          (path, listLeafFiles(path, hadoopConf, filter, None))
+        }.iterator
       }.map { case (path, statuses) =>
         val serializableStatuses = statuses.map { status =>
           // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
@@ -374,11 +370,20 @@ object PartitioningAwareFileIndex extends Logging {
   }
 
   /**
-   * List a single path, provided as a FileStatus, in serial.
+   * Lists a single filesystem path recursively. If a SparkSession object is specified, this
+   * function may launch Spark jobs to parallelize listing.
+   *
+   * If sessionOpt is None, this may be called on executors.
+   *
+   * @return all children of path that match the specified filter.
    */
-  private def listLeafFiles0(
-      fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
+  private def listLeafFiles(
+      path: Path,
+      hadoopConf: Configuration,
+      filter: PathFilter,
+      sessionOpt: Option[SparkSession]): Seq[FileStatus] = {
     logTrace(s"Listing $path")
+    val fs = path.getFileSystem(hadoopConf)
     val name = path.getName.toLowerCase
     if (shouldFilterOut(name)) {
       Seq.empty[FileStatus]
@@ -393,9 +398,15 @@ object PartitioningAwareFileIndex extends Logging {
       }
 
       val allLeafStatuses = {
-        val (dirs, files) = statuses.partition(_.isDirectory)
-        val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
-        if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
+        val (dirs, topLevelFiles) = statuses.partition(_.isDirectory)
+        val nestedFiles: Seq[FileStatus] = sessionOpt match {
+          case Some(session) =>
+            bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
+          case _ =>
+            dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
+        }
+        val allFiles = topLevelFiles ++ nestedFiles
+        if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
       }
 
       allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {

http://git-wip-us.apache.org/repos/asf/spark/blob/294163ee/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
deleted file mode 100644
index 56df1fa..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import java.io.File
-import java.net.URI
-
-import scala.collection.mutable
-import scala.language.reflectiveCalls
-
-import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
-
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.test.SharedSQLContext
-
-class FileIndexSuite extends SharedSQLContext {
-
-  test("InMemoryFileIndex: leaf files are qualified paths") {
-    withTempDir { dir =>
-      val file = new File(dir, "text.txt")
-      stringToFile(file, "text")
-
-      val path = new Path(file.getCanonicalPath)
-      val catalog = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) {
-        def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq
-        def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq
-      }
-      assert(catalog.leafFilePaths.forall(p => p.toString.startsWith("file:/")))
-      assert(catalog.leafDirPaths.forall(p => p.toString.startsWith("file:/")))
-    }
-  }
-
-  test("InMemoryFileIndex: input paths are converted to qualified paths") {
-    withTempDir { dir =>
-      val file = new File(dir, "text.txt")
-      stringToFile(file, "text")
-
-      val unqualifiedDirPath = new Path(dir.getCanonicalPath)
-      val unqualifiedFilePath = new Path(file.getCanonicalPath)
-      require(!unqualifiedDirPath.toString.contains("file:"))
-      require(!unqualifiedFilePath.toString.contains("file:"))
-
-      val fs = unqualifiedDirPath.getFileSystem(sparkContext.hadoopConfiguration)
-      val qualifiedFilePath = fs.makeQualified(new Path(file.getCanonicalPath))
-      require(qualifiedFilePath.toString.startsWith("file:"))
-
-      val catalog1 = new InMemoryFileIndex(
-        spark, Seq(unqualifiedDirPath), Map.empty, None)
-      assert(catalog1.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
-
-      val catalog2 = new InMemoryFileIndex(
-        spark, Seq(unqualifiedFilePath), Map.empty, None)
-      assert(catalog2.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
-
-    }
-  }
-
-  test("InMemoryFileIndex: folders that don't exist don't throw exceptions") {
-    withTempDir { dir =>
-      val deletedFolder = new File(dir, "deleted")
-      assert(!deletedFolder.exists())
-      val catalog1 = new InMemoryFileIndex(
-        spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None)
-      // doesn't throw an exception
-      assert(catalog1.listLeafFiles(catalog1.rootPaths).isEmpty)
-    }
-  }
-
-  test("PartitioningAwareFileIndex - file filtering") {
-    assert(!PartitioningAwareFileIndex.shouldFilterOut("abcd"))
-    assert(PartitioningAwareFileIndex.shouldFilterOut(".ab"))
-    assert(PartitioningAwareFileIndex.shouldFilterOut("_cd"))
-    assert(!PartitioningAwareFileIndex.shouldFilterOut("_metadata"))
-    assert(!PartitioningAwareFileIndex.shouldFilterOut("_common_metadata"))
-    assert(PartitioningAwareFileIndex.shouldFilterOut("_ab_metadata"))
-    assert(PartitioningAwareFileIndex.shouldFilterOut("_cd_common_metadata"))
-  }
-
-  test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") {
-    class MockCatalog(
-      override val rootPaths: Seq[Path])
-      extends PartitioningAwareFileIndex(spark, Map.empty, None) {
-
-      override def refresh(): Unit = {}
-
-      override def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = mutable.LinkedHashMap(
-        new Path("mockFs://some-bucket/file1.json") -> new FileStatus()
-      )
-
-      override def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = Map(
-        new Path("mockFs://some-bucket/") -> Array(new FileStatus())
-      )
-
-      override def partitionSpec(): PartitionSpec = {
-        PartitionSpec.emptySpec
-      }
-    }
-
-    withSQLConf(
-        "fs.mockFs.impl" -> classOf[FakeParentPathFileSystem].getName,
-        "fs.mockFs.impl.disable.cache" -> "true") {
-      val pathWithSlash = new Path("mockFs://some-bucket/")
-      assert(pathWithSlash.getParent === null)
-      val pathWithoutSlash = new Path("mockFs://some-bucket")
-      assert(pathWithoutSlash.getParent === null)
-      val catalog1 = new MockCatalog(Seq(pathWithSlash))
-      val catalog2 = new MockCatalog(Seq(pathWithoutSlash))
-      assert(catalog1.allFiles().nonEmpty)
-      assert(catalog2.allFiles().nonEmpty)
-    }
-  }
-}
-
-class FakeParentPathFileSystem extends RawLocalFileSystem {
-  override def getScheme: String = "mockFs"
-
-  override def getUri: URI = {
-    URI.create("mockFs://some-bucket")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/294163ee/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
new file mode 100644
index 0000000..b7a472b
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.net.URI
+
+import scala.collection.mutable
+import scala.language.reflectiveCalls
+
+import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FileIndexSuite extends SharedSQLContext {
+
+  test("InMemoryFileIndex: leaf files are qualified paths") {
+    withTempDir { dir =>
+      val file = new File(dir, "text.txt")
+      stringToFile(file, "text")
+
+      val path = new Path(file.getCanonicalPath)
+      val catalog = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) {
+        def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq
+        def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq
+      }
+      assert(catalog.leafFilePaths.forall(p => p.toString.startsWith("file:/")))
+      assert(catalog.leafDirPaths.forall(p => p.toString.startsWith("file:/")))
+    }
+  }
+
+  test("InMemoryFileIndex: input paths are converted to qualified paths") {
+    withTempDir { dir =>
+      val file = new File(dir, "text.txt")
+      stringToFile(file, "text")
+
+      val unqualifiedDirPath = new Path(dir.getCanonicalPath)
+      val unqualifiedFilePath = new Path(file.getCanonicalPath)
+      require(!unqualifiedDirPath.toString.contains("file:"))
+      require(!unqualifiedFilePath.toString.contains("file:"))
+
+      val fs = unqualifiedDirPath.getFileSystem(sparkContext.hadoopConfiguration)
+      val qualifiedFilePath = fs.makeQualified(new Path(file.getCanonicalPath))
+      require(qualifiedFilePath.toString.startsWith("file:"))
+
+      val catalog1 = new InMemoryFileIndex(
+        spark, Seq(unqualifiedDirPath), Map.empty, None)
+      assert(catalog1.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
+
+      val catalog2 = new InMemoryFileIndex(
+        spark, Seq(unqualifiedFilePath), Map.empty, None)
+      assert(catalog2.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
+
+    }
+  }
+
+  test("InMemoryFileIndex: folders that don't exist don't throw exceptions") {
+    withTempDir { dir =>
+      val deletedFolder = new File(dir, "deleted")
+      assert(!deletedFolder.exists())
+      val catalog1 = new InMemoryFileIndex(
+        spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None)
+      // doesn't throw an exception
+      assert(catalog1.listLeafFiles(catalog1.rootPaths).isEmpty)
+    }
+  }
+
+  test("PartitioningAwareFileIndex listing parallelized with many top level dirs") {
+    for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) {
+      withTempDir { dir =>
+        val topLevelDirs = (1 to scale).map { i =>
+          val tmp = new File(dir, s"foo=$i.txt")
+          tmp.mkdir()
+          new Path(tmp.getCanonicalPath)
+        }
+        HiveCatalogMetrics.reset()
+        assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+        new InMemoryFileIndex(spark, topLevelDirs, Map.empty, None)
+        assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
+      }
+    }
+  }
+
+  test("PartitioningAwareFileIndex listing parallelized with large child dirs") {
+    for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) {
+      withTempDir { dir =>
+        for (i <- 1 to scale) {
+          new File(dir, s"foo=$i.txt").mkdir()
+        }
+        HiveCatalogMetrics.reset()
+        assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+        new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None)
+        assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
+      }
+    }
+  }
+
+  test("PartitioningAwareFileIndex listing parallelized with large, deeply nested child dirs") {
+    for ((scale, expectedNumPar) <- Seq((10, 0), (50, 4))) {
+      withTempDir { dir =>
+        for (i <- 1 to 2) {
+          val subdirA = new File(dir, s"a=$i")
+          subdirA.mkdir()
+          for (j <- 1 to 2) {
+            val subdirB = new File(subdirA, s"b=$j")
+            subdirB.mkdir()
+            for (k <- 1 to scale) {
+              new File(subdirB, s"foo=$k.txt").mkdir()
+            }
+          }
+        }
+        HiveCatalogMetrics.reset()
+        assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+        new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None)
+        assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
+      }
+    }
+  }
+
+  test("PartitioningAwareFileIndex - file filtering") {
+    assert(!PartitioningAwareFileIndex.shouldFilterOut("abcd"))
+    assert(PartitioningAwareFileIndex.shouldFilterOut(".ab"))
+    assert(PartitioningAwareFileIndex.shouldFilterOut("_cd"))
+    assert(!PartitioningAwareFileIndex.shouldFilterOut("_metadata"))
+    assert(!PartitioningAwareFileIndex.shouldFilterOut("_common_metadata"))
+    assert(PartitioningAwareFileIndex.shouldFilterOut("_ab_metadata"))
+    assert(PartitioningAwareFileIndex.shouldFilterOut("_cd_common_metadata"))
+  }
+
+  test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") {
+    class MockCatalog(
+      override val rootPaths: Seq[Path])
+      extends PartitioningAwareFileIndex(spark, Map.empty, None) {
+
+      override def refresh(): Unit = {}
+
+      override def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = mutable.LinkedHashMap(
+        new Path("mockFs://some-bucket/file1.json") -> new FileStatus()
+      )
+
+      override def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = Map(
+        new Path("mockFs://some-bucket/") -> Array(new FileStatus())
+      )
+
+      override def partitionSpec(): PartitionSpec = {
+        PartitionSpec.emptySpec
+      }
+    }
+
+    withSQLConf(
+        "fs.mockFs.impl" -> classOf[FakeParentPathFileSystem].getName,
+        "fs.mockFs.impl.disable.cache" -> "true") {
+      val pathWithSlash = new Path("mockFs://some-bucket/")
+      assert(pathWithSlash.getParent === null)
+      val pathWithoutSlash = new Path("mockFs://some-bucket")
+      assert(pathWithoutSlash.getParent === null)
+      val catalog1 = new MockCatalog(Seq(pathWithSlash))
+      val catalog2 = new MockCatalog(Seq(pathWithoutSlash))
+      assert(catalog1.allFiles().nonEmpty)
+      assert(catalog2.allFiles().nonEmpty)
+    }
+  }
+}
+
+class FakeParentPathFileSystem extends RawLocalFileSystem {
+  override def getScheme: String = "mockFs"
+
+  override def getUri: URI = {
+    URI.create("mockFs://some-bucket")
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org