You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/10/30 20:14:51 UTC

[1/2] spark git commit: [SPARK-18103][SQL] Rename *FileCatalog to *FileIndex

Repository: spark
Updated Branches:
  refs/heads/master 3ad99f166 -> 90d3b91f4


http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d1de863..624ab74 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -200,7 +200,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
       val rootPaths: Seq[Path] = if (lazyPruningEnabled) {
         Seq(metastoreRelation.hiveQlTable.getDataLocation)
       } else {
-        // By convention (for example, see TableFileCatalog), the definition of a
+        // By convention (for example, see CatalogFileIndex), the definition of a
         // partitioned table's paths depends on whether that table has any actual partitions.
         // Partitioned tables without partitions use the location of the table's base path.
         // Partitioned tables with partitions use the locations of those partitions' data
@@ -227,7 +227,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
       val logicalRelation = cached.getOrElse {
         val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
         val fileCatalog = {
-          val catalog = new TableFileCatalog(
+          val catalog = new CatalogFileIndex(
             sparkSession, metastoreRelation.catalogTable, sizeInBytes)
           if (lazyPruningEnabled) {
             catalog

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index ecdf4f1..fc35304 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, TableFileCatalog}
+import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
@@ -321,17 +321,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
     sql("DROP TABLE cachedTable")
   }
 
-  test("cache a table using TableFileCatalog") {
+  test("cache a table using CatalogFileIndex") {
     withTable("test") {
       sql("CREATE TABLE test(i int) PARTITIONED BY (p int) STORED AS parquet")
       val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
-      val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0)
+      val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0)
 
       val dataSchema = StructType(tableMeta.schema.filterNot { f =>
         tableMeta.partitionColumnNames.contains(f.name)
       })
       val relation = HadoopFsRelation(
-        location = tableFileCatalog,
+        location = catalogFileIndex,
         partitionSchema = tableMeta.partitionSchema,
         dataSchema = dataSchema,
         bucketSpec = None,
@@ -343,7 +343,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
 
       assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined)
 
-      val sameCatalog = new TableFileCatalog(spark, tableMeta, 0)
+      val sameCatalog = new CatalogFileIndex(spark, tableMeta, 0)
       val sameRelation = HadoopFsRelation(
         location = sameCatalog,
         partitionSchema = tableMeta.partitionSchema,

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index 476383a..d8e31c4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -256,7 +256,7 @@ class PartitionedTablePerfStatsSuite
           // of doing plan cache validation based on the entire partition set.
           HiveCatalogMetrics.reset()
           assert(spark.sql("select * from test where partCol1 = 999").count() == 0)
-          // 5 from table resolution, another 5 from ListingFileCatalog
+          // 5 from table resolution, another 5 from InMemoryFileIndex
           assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 10)
           assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
index 59639aa..cdbc26c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions, TableFileCatalog}
+import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
@@ -45,13 +45,13 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te
             |LOCATION '${dir.getAbsolutePath}'""".stripMargin)
 
         val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
-        val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0)
+        val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0)
 
         val dataSchema = StructType(tableMeta.schema.filterNot { f =>
           tableMeta.partitionColumnNames.contains(f.name)
         })
         val relation = HadoopFsRelation(
-          location = tableFileCatalog,
+          location = catalogFileIndex,
           partitionSchema = tableMeta.partitionSchema,
           dataSchema = dataSchema,
           bucketSpec = None,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-18103][SQL] Rename *FileCatalog to *FileIndex

Posted by rx...@apache.org.
[SPARK-18103][SQL] Rename *FileCatalog to *FileIndex

## What changes were proposed in this pull request?

To reduce the number of components in SQL named *Catalog, rename *FileCatalog to *FileIndex. A FileIndex is responsible for returning the list of partitions / files to scan given a filtering expression.

```
TableFileCatalog => CatalogFileIndex
FileCatalog => FileIndex
ListingFileCatalog => InMemoryFileIndex
MetadataLogFileCatalog => MetadataLogFileIndex
PrunedTableFileCatalog => PrunedInMemoryFileIndex
```

cc yhuai marmbrus

## How was this patch tested?

N/A

Author: Eric Liang <ek...@databricks.com>
Author: Eric Liang <ek...@gmail.com>

Closes #15634 from ericl/rename-file-provider.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90d3b91f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90d3b91f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90d3b91f

Branch: refs/heads/master
Commit: 90d3b91f4cb59d84fea7105d54ef8c87a7d5c6a2
Parents: 3ad99f1
Author: Eric Liang <ek...@databricks.com>
Authored: Sun Oct 30 13:14:45 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sun Oct 30 13:14:45 2016 -0700

----------------------------------------------------------------------
 .../spark/metrics/source/StaticSources.scala    |   2 +-
 .../spark/sql/execution/CacheManager.scala      |   2 +-
 .../datasources/CatalogFileIndex.scala          | 110 +++++
 .../sql/execution/datasources/DataSource.scala  |  10 +-
 .../sql/execution/datasources/FileCatalog.scala |  70 ---
 .../sql/execution/datasources/FileIndex.scala   |  70 +++
 .../datasources/HadoopFsRelation.scala          |   4 +-
 .../datasources/InMemoryFileIndex.scala         |  87 ++++
 .../datasources/ListingFileCatalog.scala        |  87 ----
 .../PartitioningAwareFileCatalog.scala          | 437 -------------------
 .../PartitioningAwareFileIndex.scala            | 437 +++++++++++++++++++
 .../datasources/PruneFileSourcePartitions.scala |   6 +-
 .../datasources/TableFileCatalog.scala          | 110 -----
 .../streaming/CompactibleFileStreamLog.scala    |   4 +-
 .../execution/streaming/FileStreamSource.scala  |   4 +-
 .../streaming/MetadataLogFileCatalog.scala      |   6 +-
 .../datasources/FileCatalogSuite.scala          |  36 +-
 .../datasources/FileSourceStrategySuite.scala   |   2 +-
 .../ParquetPartitionDiscoverySuite.scala        |   2 +-
 .../sql/streaming/FileStreamSinkSuite.scala     |   6 +-
 .../sql/streaming/FileStreamSourceSuite.scala   |   2 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |   4 +-
 .../spark/sql/hive/CachedTableSuite.scala       |  10 +-
 .../hive/PartitionedTablePerfStatsSuite.scala   |   2 +-
 .../PruneFileSourcePartitionsSuite.scala        |   6 +-
 25 files changed, 758 insertions(+), 758 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
index b54885b..3f7cfd9 100644
--- a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
@@ -76,7 +76,7 @@ object HiveCatalogMetrics extends Source {
   val METRIC_PARTITIONS_FETCHED = metricRegistry.counter(MetricRegistry.name("partitionsFetched"))
 
   /**
-   * Tracks the total number of files discovered off of the filesystem by ListingFileCatalog.
+   * Tracks the total number of files discovered off of the filesystem by InMemoryFileIndex.
    */
   val METRIC_FILES_DISCOVERED = metricRegistry.counter(MetricRegistry.name("filesDiscovered"))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index fb72c67..526623a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -177,7 +177,7 @@ class CacheManager extends Logging {
 
   /**
    * Traverses a given `plan` and searches for the occurrences of `qualifiedPath` in the
-   * [[org.apache.spark.sql.execution.datasources.FileCatalog]] of any [[HadoopFsRelation]] nodes
+   * [[org.apache.spark.sql.execution.datasources.FileIndex]] of any [[HadoopFsRelation]] nodes
    * in the plan. If found, we refresh the metadata and return true. Otherwise, this method returns
    * false.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
new file mode 100644
index 0000000..092aabc
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.StructType
+
+
+/**
+ * A [[FileIndex]] for a metastore catalog table.
+ *
+ * @param sparkSession a [[SparkSession]]
+ * @param table the metadata of the table
+ * @param sizeInBytes the table's data size in bytes
+ */
+class CatalogFileIndex(
+    sparkSession: SparkSession,
+    val table: CatalogTable,
+    override val sizeInBytes: Long) extends FileIndex {
+
+  protected val hadoopConf = sparkSession.sessionState.newHadoopConf
+
+  private val fileStatusCache = FileStatusCache.newCache(sparkSession)
+
+  assert(table.identifier.database.isDefined,
+    "The table identifier must be qualified in CatalogFileIndex")
+
+  private val baseLocation = table.storage.locationUri
+
+  override def partitionSchema: StructType = table.partitionSchema
+
+  override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
+
+  override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
+    filterPartitions(filters).listFiles(Nil)
+  }
+
+  override def refresh(): Unit = fileStatusCache.invalidateAll()
+
+  /**
+   * Returns a [[InMemoryFileIndex]] for this table restricted to the subset of partitions
+   * specified by the given partition-pruning filters.
+   *
+   * @param filters partition-pruning filters
+   */
+  def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = {
+    if (table.partitionColumnNames.nonEmpty) {
+      val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
+        table.identifier, filters)
+      val partitions = selectedPartitions.map { p =>
+        PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get)
+      }
+      val partitionSpec = PartitionSpec(partitionSchema, partitions)
+      new PrunedInMemoryFileIndex(
+        sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
+    } else {
+      new InMemoryFileIndex(sparkSession, rootPaths, table.storage.properties, None)
+    }
+  }
+
+  override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles
+
+  // `CatalogFileIndex` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member
+  // of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to
+  // implement `equals` and `hashCode` here, to make it work with cache lookup.
+  override def equals(o: Any): Boolean = o match {
+    case other: CatalogFileIndex => this.table.identifier == other.table.identifier
+    case _ => false
+  }
+
+  override def hashCode(): Int = table.identifier.hashCode()
+}
+
+/**
+ * An override of the standard HDFS listing based catalog, that overrides the partition spec with
+ * the information from the metastore.
+ *
+ * @param tableBasePath The default base path of the Hive metastore table
+ * @param partitionSpec The partition specifications from Hive metastore
+ */
+private class PrunedInMemoryFileIndex(
+    sparkSession: SparkSession,
+    tableBasePath: Path,
+    fileStatusCache: FileStatusCache,
+    override val partitionSpec: PartitionSpec)
+  extends InMemoryFileIndex(
+    sparkSession,
+    partitionSpec.partitions.map(_.path),
+    Map.empty,
+    Some(partitionSpec.partitionColumns),
+    fileStatusCache)

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 5b8f05a..9961098 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -202,7 +202,7 @@ case class DataSource(
         val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
         SparkHadoopUtil.get.globPathIfNecessary(qualified)
       }.toArray
-      val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None)
+      val fileCatalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
       val partitionSchema = fileCatalog.partitionSpec().partitionColumns
       val inferred = format.inferSchema(
         sparkSession,
@@ -364,7 +364,7 @@ case class DataSource(
       case (format: FileFormat, _)
           if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) =>
         val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
-        val fileCatalog = new MetadataLogFileCatalog(sparkSession, basePath)
+        val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath)
         val dataSchema = userSpecifiedSchema.orElse {
           format.inferSchema(
             sparkSession,
@@ -417,12 +417,12 @@ case class DataSource(
 
         val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
             catalogTable.isDefined && catalogTable.get.partitionProviderIsHive) {
-          new TableFileCatalog(
+          new CatalogFileIndex(
             sparkSession,
             catalogTable.get,
             catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L))
         } else {
-          new ListingFileCatalog(
+          new InMemoryFileIndex(
             sparkSession, globbedPaths, options, partitionSchema)
         }
 
@@ -433,7 +433,7 @@ case class DataSource(
           format.inferSchema(
             sparkSession,
             caseInsensitiveOptions,
-            fileCatalog.asInstanceOf[ListingFileCatalog].allFiles())
+            fileCatalog.asInstanceOf[InMemoryFileIndex].allFiles())
         }.getOrElse {
           throw new AnalysisException(
             s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. " +

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
deleted file mode 100644
index dba6462..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.hadoop.fs._
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.StructType
-
-/**
- * A collection of data files from a partitioned relation, along with the partition values in the
- * form of an [[InternalRow]].
- */
-case class PartitionDirectory(values: InternalRow, files: Seq[FileStatus])
-
-/**
- * An interface for objects capable of enumerating the root paths of a relation as well as the
- * partitions of a relation subject to some pruning expressions.
- */
-trait FileCatalog {
-
-  /**
-   * Returns the list of root input paths from which the catalog will get files. There may be a
-   * single root path from which partitions are discovered, or individual partitions may be
-   * specified by each path.
-   */
-  def rootPaths: Seq[Path]
-
-  /**
-   * Returns all valid files grouped into partitions when the data is partitioned. If the data is
-   * unpartitioned, this will return a single partition with no partition values.
-   *
-   * @param filters The filters used to prune which partitions are returned.  These filters must
-   *                only refer to partition columns and this method will only return files
-   *                where these predicates are guaranteed to evaluate to `true`.  Thus, these
-   *                filters will not need to be evaluated again on the returned data.
-   */
-  def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory]
-
-  /**
-   * Returns the list of files that will be read when scanning this relation. This call may be
-   * very expensive for large tables.
-   */
-  def inputFiles: Array[String]
-
-  /** Refresh any cached file listings */
-  def refresh(): Unit
-
-  /** Sum of table file sizes, in bytes */
-  def sizeInBytes: Long
-
-  /** Schema of the partitioning columns, or the empty schema if the table is not partitioned. */
-  def partitionSchema: StructType
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
new file mode 100644
index 0000000..277223d
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs._
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A collection of data files from a partitioned relation, along with the partition values in the
+ * form of an [[InternalRow]].
+ */
+case class PartitionDirectory(values: InternalRow, files: Seq[FileStatus])
+
+/**
+ * An interface for objects capable of enumerating the root paths of a relation as well as the
+ * partitions of a relation subject to some pruning expressions.
+ */
+trait FileIndex {
+
+  /**
+   * Returns the list of root input paths from which the catalog will get files. There may be a
+   * single root path from which partitions are discovered, or individual partitions may be
+   * specified by each path.
+   */
+  def rootPaths: Seq[Path]
+
+  /**
+   * Returns all valid files grouped into partitions when the data is partitioned. If the data is
+   * unpartitioned, this will return a single partition with no partition values.
+   *
+   * @param filters The filters used to prune which partitions are returned.  These filters must
+   *                only refer to partition columns and this method will only return files
+   *                where these predicates are guaranteed to evaluate to `true`.  Thus, these
+   *                filters will not need to be evaluated again on the returned data.
+   */
+  def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory]
+
+  /**
+   * Returns the list of files that will be read when scanning this relation. This call may be
+   * very expensive for large tables.
+   */
+  def inputFiles: Array[String]
+
+  /** Refresh any cached file listings */
+  def refresh(): Unit
+
+  /** Sum of table file sizes, in bytes */
+  def sizeInBytes: Long
+
+  /** Schema of the partitioning columns, or the empty schema if the table is not partitioned. */
+  def partitionSchema: StructType
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index afad889..014abd4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType
  * Acts as a container for all of the metadata required to read from a datasource. All discovery,
  * resolution and merging logic for schemas and partitions has been removed.
  *
- * @param location A [[FileCatalog]] that can enumerate the locations of all the files that
+ * @param location A [[FileIndex]] that can enumerate the locations of all the files that
  *                 comprise this relation.
  * @param partitionSchema The schema of the columns (if any) that are used to partition the relation
  * @param dataSchema The schema of any remaining columns.  Note that if any partition columns are
@@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType
  * @param options Configuration used when reading / writing data.
  */
 case class HadoopFsRelation(
-    location: FileCatalog,
+    location: FileIndex,
     partitionSchema: StructType,
     dataSchema: StructType,
     bucketSpec: Option[BucketSpec],

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
new file mode 100644
index 0000000..7531f0a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import scala.collection.mutable
+
+import org.apache.hadoop.fs._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+
+
+/**
+ * A [[FileIndex]] that generates the list of files to process by recursively listing all the
+ * files present in `paths`.
+ *
+ * @param rootPaths the list of root table paths to scan
+ * @param parameters as set of options to control discovery
+ * @param partitionSchema an optional partition schema that will be use to provide types for the
+ *                        discovered partitions
+ */
+class InMemoryFileIndex(
+    sparkSession: SparkSession,
+    override val rootPaths: Seq[Path],
+    parameters: Map[String, String],
+    partitionSchema: Option[StructType],
+    fileStatusCache: FileStatusCache = NoopCache)
+  extends PartitioningAwareFileIndex(
+    sparkSession, parameters, partitionSchema, fileStatusCache) {
+
+  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
+  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
+  @volatile private var cachedPartitionSpec: PartitionSpec = _
+
+  refresh0()
+
+  override def partitionSpec(): PartitionSpec = {
+    if (cachedPartitionSpec == null) {
+      cachedPartitionSpec = inferPartitioning()
+    }
+    logTrace(s"Partition spec: $cachedPartitionSpec")
+    cachedPartitionSpec
+  }
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = {
+    cachedLeafFiles
+  }
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = {
+    cachedLeafDirToChildrenFiles
+  }
+
+  override def refresh(): Unit = {
+    refresh0()
+    fileStatusCache.invalidateAll()
+  }
+
+  private def refresh0(): Unit = {
+    val files = listLeafFiles(rootPaths)
+    cachedLeafFiles =
+      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
+    cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
+    cachedPartitionSpec = null
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case _ => false
+  }
+
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
deleted file mode 100644
index d9d5883..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import scala.collection.mutable
-
-import org.apache.hadoop.fs._
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.types.StructType
-
-
-/**
- * A [[FileCatalog]] that generates the list of files to process by recursively listing all the
- * files present in `paths`.
- *
- * @param rootPaths the list of root table paths to scan
- * @param parameters as set of options to control discovery
- * @param partitionSchema an optional partition schema that will be use to provide types for the
- *                        discovered partitions
- */
-class ListingFileCatalog(
-    sparkSession: SparkSession,
-    override val rootPaths: Seq[Path],
-    parameters: Map[String, String],
-    partitionSchema: Option[StructType],
-    fileStatusCache: FileStatusCache = NoopCache)
-  extends PartitioningAwareFileCatalog(
-    sparkSession, parameters, partitionSchema, fileStatusCache) {
-
-  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
-  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
-  @volatile private var cachedPartitionSpec: PartitionSpec = _
-
-  refresh0()
-
-  override def partitionSpec(): PartitionSpec = {
-    if (cachedPartitionSpec == null) {
-      cachedPartitionSpec = inferPartitioning()
-    }
-    logTrace(s"Partition spec: $cachedPartitionSpec")
-    cachedPartitionSpec
-  }
-
-  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = {
-    cachedLeafFiles
-  }
-
-  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = {
-    cachedLeafDirToChildrenFiles
-  }
-
-  override def refresh(): Unit = {
-    refresh0()
-    fileStatusCache.invalidateAll()
-  }
-
-  private def refresh0(): Unit = {
-    val files = listLeafFiles(rootPaths)
-    cachedLeafFiles =
-      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
-    cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
-    cachedPartitionSpec = null
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case hdfs: ListingFileCatalog => rootPaths.toSet == hdfs.rootPaths.toSet
-    case _ => false
-  }
-
-  override def hashCode(): Int = rootPaths.toSet.hashCode()
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
deleted file mode 100644
index cc4049e..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ /dev/null
@@ -1,437 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import java.io.FileNotFoundException
-
-import scala.collection.mutable
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs._
-import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.metrics.source.HiveCatalogMetrics
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.{expressions, InternalRow}
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{StringType, StructType}
-import org.apache.spark.util.SerializableConfiguration
-
-/**
- * An abstract class that represents [[FileCatalog]]s that are aware of partitioned tables.
- * It provides the necessary methods to parse partition data based on a set of files.
- *
- * @param parameters as set of options to control partition discovery
- * @param userPartitionSchema an optional partition schema that will be use to provide types for
- *                            the discovered partitions
- */
-abstract class PartitioningAwareFileCatalog(
-    sparkSession: SparkSession,
-    parameters: Map[String, String],
-    userPartitionSchema: Option[StructType],
-    fileStatusCache: FileStatusCache = NoopCache) extends FileCatalog with Logging {
-  import PartitioningAwareFileCatalog.BASE_PATH_PARAM
-
-  /** Returns the specification of the partitions inferred from the data. */
-  def partitionSpec(): PartitionSpec
-
-  override def partitionSchema: StructType = partitionSpec().partitionColumns
-
-  protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
-
-  protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]
-
-  protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]
-
-  override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
-    val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
-      PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
-    } else {
-      prunePartitions(filters, partitionSpec()).map {
-        case PartitionPath(values, path) =>
-          val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
-            case Some(existingDir) =>
-              // Directory has children files in it, return them
-              existingDir.filter(f => isDataPath(f.getPath))
-
-            case None =>
-              // Directory does not exist, or has no children files
-              Nil
-          }
-          PartitionDirectory(values, files)
-      }
-    }
-    logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t"))
-    selectedPartitions
-  }
-
-  /** Returns the list of files that will be read when scanning this relation. */
-  override def inputFiles: Array[String] =
-    allFiles().map(_.getPath.toUri.toString).toArray
-
-  override def sizeInBytes: Long = allFiles().map(_.getLen).sum
-
-  def allFiles(): Seq[FileStatus] = {
-    if (partitionSpec().partitionColumns.isEmpty) {
-      // For each of the root input paths, get the list of files inside them
-      rootPaths.flatMap { path =>
-        // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
-        val fs = path.getFileSystem(hadoopConf)
-        val qualifiedPathPre = fs.makeQualified(path)
-        val qualifiedPath: Path = if (qualifiedPathPre.isRoot && !qualifiedPathPre.isAbsolute) {
-          // SPARK-17613: Always append `Path.SEPARATOR` to the end of parent directories,
-          // because the `leafFile.getParent` would have returned an absolute path with the
-          // separator at the end.
-          new Path(qualifiedPathPre, Path.SEPARATOR)
-        } else {
-          qualifiedPathPre
-        }
-
-        // There are three cases possible with each path
-        // 1. The path is a directory and has children files in it. Then it must be present in
-        //    leafDirToChildrenFiles as those children files will have been found as leaf files.
-        //    Find its children files from leafDirToChildrenFiles and include them.
-        // 2. The path is a file, then it will be present in leafFiles. Include this path.
-        // 3. The path is a directory, but has no children files. Do not include this path.
-
-        leafDirToChildrenFiles.get(qualifiedPath)
-          .orElse { leafFiles.get(qualifiedPath).map(Array(_)) }
-          .getOrElse(Array.empty)
-      }
-    } else {
-      leafFiles.values.toSeq
-    }
-  }
-
-  protected def inferPartitioning(): PartitionSpec = {
-    // We use leaf dirs containing data files to discover the schema.
-    val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
-      files.exists(f => isDataPath(f.getPath))
-    }.keys.toSeq
-    userPartitionSchema match {
-      case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
-        val spec = PartitioningUtils.parsePartitions(
-          leafDirs,
-          PartitioningUtils.DEFAULT_PARTITION_NAME,
-          typeInference = false,
-          basePaths = basePaths)
-
-        // Without auto inference, all of value in the `row` should be null or in StringType,
-        // we need to cast into the data type that user specified.
-        def castPartitionValuesToUserSchema(row: InternalRow) = {
-          InternalRow((0 until row.numFields).map { i =>
-            Cast(
-              Literal.create(row.getUTF8String(i), StringType),
-              userProvidedSchema.fields(i).dataType).eval()
-          }: _*)
-        }
-
-        PartitionSpec(userProvidedSchema, spec.partitions.map { part =>
-          part.copy(values = castPartitionValuesToUserSchema(part.values))
-        })
-      case _ =>
-        PartitioningUtils.parsePartitions(
-          leafDirs,
-          PartitioningUtils.DEFAULT_PARTITION_NAME,
-          typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
-          basePaths = basePaths)
-    }
-  }
-
-  private def prunePartitions(
-      predicates: Seq[Expression],
-      partitionSpec: PartitionSpec): Seq[PartitionPath] = {
-    val PartitionSpec(partitionColumns, partitions) = partitionSpec
-    val partitionColumnNames = partitionColumns.map(_.name).toSet
-    val partitionPruningPredicates = predicates.filter {
-      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
-    }
-
-    if (partitionPruningPredicates.nonEmpty) {
-      val predicate = partitionPruningPredicates.reduce(expressions.And)
-
-      val boundPredicate = InterpretedPredicate.create(predicate.transform {
-        case a: AttributeReference =>
-          val index = partitionColumns.indexWhere(a.name == _.name)
-          BoundReference(index, partitionColumns(index).dataType, nullable = true)
-      })
-
-      val selected = partitions.filter {
-        case PartitionPath(values, _) => boundPredicate(values)
-      }
-      logInfo {
-        val total = partitions.length
-        val selectedSize = selected.length
-        val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100
-        s"Selected $selectedSize partitions out of $total, pruned $percentPruned% partitions."
-      }
-
-      selected
-    } else {
-      partitions
-    }
-  }
-
-  /**
-   * Contains a set of paths that are considered as the base dirs of the input datasets.
-   * The partitioning discovery logic will make sure it will stop when it reaches any
-   * base path.
-   *
-   * By default, the paths of the dataset provided by users will be base paths.
-   * Below are three typical examples,
-   * Case 1) `spark.read.parquet("/path/something=true/")`: the base path will be
-   * `/path/something=true/`, and the returned DataFrame will not contain a column of `something`.
-   * Case 2) `spark.read.parquet("/path/something=true/a.parquet")`: the base path will be
-   * still `/path/something=true/`, and the returned DataFrame will also not contain a column of
-   * `something`.
-   * Case 3) `spark.read.parquet("/path/")`: the base path will be `/path/`, and the returned
-   * DataFrame will have the column of `something`.
-   *
-   * Users also can override the basePath by setting `basePath` in the options to pass the new base
-   * path to the data source.
-   * For example, `spark.read.option("basePath", "/path/").parquet("/path/something=true/")`,
-   * and the returned DataFrame will have the column of `something`.
-   */
-  private def basePaths: Set[Path] = {
-    parameters.get(BASE_PATH_PARAM).map(new Path(_)) match {
-      case Some(userDefinedBasePath) =>
-        val fs = userDefinedBasePath.getFileSystem(hadoopConf)
-        if (!fs.isDirectory(userDefinedBasePath)) {
-          throw new IllegalArgumentException(s"Option '$BASE_PATH_PARAM' must be a directory")
-        }
-        Set(fs.makeQualified(userDefinedBasePath))
-
-      case None =>
-        rootPaths.map { path =>
-          // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
-          val qualifiedPath = path.getFileSystem(hadoopConf).makeQualified(path)
-          if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else qualifiedPath }.toSet
-    }
-  }
-
-  // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be
-  // counted as data files, so that they shouldn't participate partition discovery.
-  private def isDataPath(path: Path): Boolean = {
-    val name = path.getName
-    !((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
-  }
-
-  /**
-   * List leaf files of given paths. This method will submit a Spark job to do parallel
-   * listing whenever there is a path having more files than the parallel partition discovery
-   * discovery threshold.
-   *
-   * This is publicly visible for testing.
-   */
-  def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
-    val output = mutable.LinkedHashSet[FileStatus]()
-    val pathsToFetch = mutable.ArrayBuffer[Path]()
-    for (path <- paths) {
-      fileStatusCache.getLeafFiles(path) match {
-        case Some(files) =>
-          HiveCatalogMetrics.incrementFileCacheHits(files.length)
-          output ++= files
-        case None =>
-          pathsToFetch += path
-      }
-    }
-    val discovered = if (pathsToFetch.length >=
-        sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
-      PartitioningAwareFileCatalog.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession)
-    } else {
-      PartitioningAwareFileCatalog.listLeafFilesInSerial(pathsToFetch, hadoopConf)
-    }
-    discovered.foreach { case (path, leafFiles) =>
-      HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
-      fileStatusCache.putLeafFiles(path, leafFiles.toArray)
-      output ++= leafFiles
-    }
-    output
-  }
-}
-
-object PartitioningAwareFileCatalog extends Logging {
-  val BASE_PATH_PARAM = "basePath"
-
-  /** A serializable variant of HDFS's BlockLocation. */
-  private case class SerializableBlockLocation(
-      names: Array[String],
-      hosts: Array[String],
-      offset: Long,
-      length: Long)
-
-  /** A serializable variant of HDFS's FileStatus. */
-  private case class SerializableFileStatus(
-      path: String,
-      length: Long,
-      isDir: Boolean,
-      blockReplication: Short,
-      blockSize: Long,
-      modificationTime: Long,
-      accessTime: Long,
-      blockLocations: Array[SerializableBlockLocation])
-
-  /**
-   * List a collection of path recursively.
-   */
-  private def listLeafFilesInSerial(
-      paths: Seq[Path],
-      hadoopConf: Configuration): Seq[(Path, Seq[FileStatus])] = {
-    // Dummy jobconf to get to the pathFilter defined in configuration
-    val jobConf = new JobConf(hadoopConf, this.getClass)
-    val filter = FileInputFormat.getInputPathFilter(jobConf)
-
-    paths.map { path =>
-      val fs = path.getFileSystem(hadoopConf)
-      (path, listLeafFiles0(fs, path, filter))
-    }
-  }
-
-  /**
-   * List a collection of path recursively in parallel (using Spark executors).
-   * Each task launched will use [[listLeafFilesInSerial]] to list.
-   */
-  private def listLeafFilesInParallel(
-      paths: Seq[Path],
-      hadoopConf: Configuration,
-      sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
-    assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
-    logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
-
-    val sparkContext = sparkSession.sparkContext
-    val serializableConfiguration = new SerializableConfiguration(hadoopConf)
-    val serializedPaths = paths.map(_.toString)
-
-    // Set the number of parallelism to prevent following file listing from generating many tasks
-    // in case of large #defaultParallelism.
-    val numParallelism = Math.min(paths.size, 10000)
-
-    val statusMap = sparkContext
-      .parallelize(serializedPaths, numParallelism)
-      .mapPartitions { paths =>
-        val hadoopConf = serializableConfiguration.value
-        listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
-      }.map { case (path, statuses) =>
-        val serializableStatuses = statuses.map { status =>
-          // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
-          val blockLocations = status match {
-            case f: LocatedFileStatus =>
-              f.getBlockLocations.map { loc =>
-                SerializableBlockLocation(
-                  loc.getNames,
-                  loc.getHosts,
-                  loc.getOffset,
-                  loc.getLength)
-              }
-
-            case _ =>
-              Array.empty[SerializableBlockLocation]
-          }
-
-          SerializableFileStatus(
-            status.getPath.toString,
-            status.getLen,
-            status.isDirectory,
-            status.getReplication,
-            status.getBlockSize,
-            status.getModificationTime,
-            status.getAccessTime,
-            blockLocations)
-        }
-        (path.toString, serializableStatuses)
-      }.collect()
-
-    // turn SerializableFileStatus back to Status
-    statusMap.map { case (path, serializableStatuses) =>
-      val statuses = serializableStatuses.map { f =>
-        val blockLocations = f.blockLocations.map { loc =>
-          new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
-        }
-        new LocatedFileStatus(
-          new FileStatus(
-            f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime,
-            new Path(f.path)),
-          blockLocations)
-      }
-      (new Path(path), statuses)
-    }
-  }
-
-  /**
-   * List a single path, provided as a FileStatus, in serial.
-   */
-  private def listLeafFiles0(
-      fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
-    logTrace(s"Listing $path")
-    val name = path.getName.toLowerCase
-    if (shouldFilterOut(name)) {
-      Seq.empty[FileStatus]
-    } else {
-      // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist
-      // Note that statuses only include FileStatus for the files and dirs directly under path,
-      // and does not include anything else recursively.
-      val statuses = try fs.listStatus(path) catch {
-        case _: FileNotFoundException =>
-          logWarning(s"The directory $path was not found. Was it deleted very recently?")
-          Array.empty[FileStatus]
-      }
-
-      val allLeafStatuses = {
-        val (dirs, files) = statuses.partition(_.isDirectory)
-        val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
-        if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
-      }
-
-      allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
-        case f: LocatedFileStatus =>
-          f
-
-        // NOTE:
-        //
-        // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
-        //   operations, calling `getFileBlockLocations` does no harm here since these file system
-        //   implementations don't actually issue RPC for this method.
-        //
-        // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
-        //   be a big deal since we always use to `listLeafFilesInParallel` when the number of
-        //   paths exceeds threshold.
-        case f =>
-          // The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
-          // which is very slow on some file system (RawLocalFileSystem, which is launch a
-          // subprocess and parse the stdout).
-          val locations = fs.getFileBlockLocations(f, 0, f.getLen)
-          val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
-            f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
-          if (f.isSymlink) {
-            lfs.setSymlink(f.getSymlink)
-          }
-          lfs
-      }
-    }
-  }
-
-  /** Checks if we should filter out this path name. */
-  def shouldFilterOut(pathName: String): Boolean = {
-    // We filter everything that starts with _ and ., except _common_metadata and _metadata
-    // because Parquet needs to find those metadata files from leaf files returned by this method.
-    // We should refactor this logic to not mix metadata files with data files.
-    ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) &&
-      !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
new file mode 100644
index 0000000..a8a722d
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.FileNotFoundException
+
+import scala.collection.mutable
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, InternalRow}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * An abstract class that represents [[FileIndex]]s that are aware of partitioned tables.
+ * It provides the necessary methods to parse partition data based on a set of files.
+ *
+ * @param parameters as set of options to control partition discovery
+ * @param userPartitionSchema an optional partition schema that will be use to provide types for
+ *                            the discovered partitions
+ */
+abstract class PartitioningAwareFileIndex(
+    sparkSession: SparkSession,
+    parameters: Map[String, String],
+    userPartitionSchema: Option[StructType],
+    fileStatusCache: FileStatusCache = NoopCache) extends FileIndex with Logging {
+  import PartitioningAwareFileIndex.BASE_PATH_PARAM
+
+  /** Returns the specification of the partitions inferred from the data. */
+  def partitionSpec(): PartitionSpec
+
+  override def partitionSchema: StructType = partitionSpec().partitionColumns
+
+  protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
+
+  protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]
+
+  protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]
+
+  override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
+    val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
+      PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
+    } else {
+      prunePartitions(filters, partitionSpec()).map {
+        case PartitionPath(values, path) =>
+          val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
+            case Some(existingDir) =>
+              // Directory has children files in it, return them
+              existingDir.filter(f => isDataPath(f.getPath))
+
+            case None =>
+              // Directory does not exist, or has no children files
+              Nil
+          }
+          PartitionDirectory(values, files)
+      }
+    }
+    logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t"))
+    selectedPartitions
+  }
+
+  /** Returns the list of files that will be read when scanning this relation. */
+  override def inputFiles: Array[String] =
+    allFiles().map(_.getPath.toUri.toString).toArray
+
+  override def sizeInBytes: Long = allFiles().map(_.getLen).sum
+
+  def allFiles(): Seq[FileStatus] = {
+    if (partitionSpec().partitionColumns.isEmpty) {
+      // For each of the root input paths, get the list of files inside them
+      rootPaths.flatMap { path =>
+        // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
+        val fs = path.getFileSystem(hadoopConf)
+        val qualifiedPathPre = fs.makeQualified(path)
+        val qualifiedPath: Path = if (qualifiedPathPre.isRoot && !qualifiedPathPre.isAbsolute) {
+          // SPARK-17613: Always append `Path.SEPARATOR` to the end of parent directories,
+          // because the `leafFile.getParent` would have returned an absolute path with the
+          // separator at the end.
+          new Path(qualifiedPathPre, Path.SEPARATOR)
+        } else {
+          qualifiedPathPre
+        }
+
+        // There are three cases possible with each path
+        // 1. The path is a directory and has children files in it. Then it must be present in
+        //    leafDirToChildrenFiles as those children files will have been found as leaf files.
+        //    Find its children files from leafDirToChildrenFiles and include them.
+        // 2. The path is a file, then it will be present in leafFiles. Include this path.
+        // 3. The path is a directory, but has no children files. Do not include this path.
+
+        leafDirToChildrenFiles.get(qualifiedPath)
+          .orElse { leafFiles.get(qualifiedPath).map(Array(_)) }
+          .getOrElse(Array.empty)
+      }
+    } else {
+      leafFiles.values.toSeq
+    }
+  }
+
+  protected def inferPartitioning(): PartitionSpec = {
+    // We use leaf dirs containing data files to discover the schema.
+    val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
+      files.exists(f => isDataPath(f.getPath))
+    }.keys.toSeq
+    userPartitionSchema match {
+      case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
+        val spec = PartitioningUtils.parsePartitions(
+          leafDirs,
+          PartitioningUtils.DEFAULT_PARTITION_NAME,
+          typeInference = false,
+          basePaths = basePaths)
+
+        // Without auto inference, all of value in the `row` should be null or in StringType,
+        // we need to cast into the data type that user specified.
+        def castPartitionValuesToUserSchema(row: InternalRow) = {
+          InternalRow((0 until row.numFields).map { i =>
+            Cast(
+              Literal.create(row.getUTF8String(i), StringType),
+              userProvidedSchema.fields(i).dataType).eval()
+          }: _*)
+        }
+
+        PartitionSpec(userProvidedSchema, spec.partitions.map { part =>
+          part.copy(values = castPartitionValuesToUserSchema(part.values))
+        })
+      case _ =>
+        PartitioningUtils.parsePartitions(
+          leafDirs,
+          PartitioningUtils.DEFAULT_PARTITION_NAME,
+          typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
+          basePaths = basePaths)
+    }
+  }
+
+  private def prunePartitions(
+      predicates: Seq[Expression],
+      partitionSpec: PartitionSpec): Seq[PartitionPath] = {
+    val PartitionSpec(partitionColumns, partitions) = partitionSpec
+    val partitionColumnNames = partitionColumns.map(_.name).toSet
+    val partitionPruningPredicates = predicates.filter {
+      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+    }
+
+    if (partitionPruningPredicates.nonEmpty) {
+      val predicate = partitionPruningPredicates.reduce(expressions.And)
+
+      val boundPredicate = InterpretedPredicate.create(predicate.transform {
+        case a: AttributeReference =>
+          val index = partitionColumns.indexWhere(a.name == _.name)
+          BoundReference(index, partitionColumns(index).dataType, nullable = true)
+      })
+
+      val selected = partitions.filter {
+        case PartitionPath(values, _) => boundPredicate(values)
+      }
+      logInfo {
+        val total = partitions.length
+        val selectedSize = selected.length
+        val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100
+        s"Selected $selectedSize partitions out of $total, pruned $percentPruned% partitions."
+      }
+
+      selected
+    } else {
+      partitions
+    }
+  }
+
+  /**
+   * Contains a set of paths that are considered as the base dirs of the input datasets.
+   * The partitioning discovery logic will make sure it will stop when it reaches any
+   * base path.
+   *
+   * By default, the paths of the dataset provided by users will be base paths.
+   * Below are three typical examples,
+   * Case 1) `spark.read.parquet("/path/something=true/")`: the base path will be
+   * `/path/something=true/`, and the returned DataFrame will not contain a column of `something`.
+   * Case 2) `spark.read.parquet("/path/something=true/a.parquet")`: the base path will be
+   * still `/path/something=true/`, and the returned DataFrame will also not contain a column of
+   * `something`.
+   * Case 3) `spark.read.parquet("/path/")`: the base path will be `/path/`, and the returned
+   * DataFrame will have the column of `something`.
+   *
+   * Users also can override the basePath by setting `basePath` in the options to pass the new base
+   * path to the data source.
+   * For example, `spark.read.option("basePath", "/path/").parquet("/path/something=true/")`,
+   * and the returned DataFrame will have the column of `something`.
+   */
+  private def basePaths: Set[Path] = {
+    parameters.get(BASE_PATH_PARAM).map(new Path(_)) match {
+      case Some(userDefinedBasePath) =>
+        val fs = userDefinedBasePath.getFileSystem(hadoopConf)
+        if (!fs.isDirectory(userDefinedBasePath)) {
+          throw new IllegalArgumentException(s"Option '$BASE_PATH_PARAM' must be a directory")
+        }
+        Set(fs.makeQualified(userDefinedBasePath))
+
+      case None =>
+        rootPaths.map { path =>
+          // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
+          val qualifiedPath = path.getFileSystem(hadoopConf).makeQualified(path)
+          if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else qualifiedPath }.toSet
+    }
+  }
+
+  // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be
+  // counted as data files, so that they shouldn't participate partition discovery.
+  private def isDataPath(path: Path): Boolean = {
+    val name = path.getName
+    !((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
+  }
+
+  /**
+   * List leaf files of given paths. This method will submit a Spark job to do parallel
+   * listing whenever there is a path having more files than the parallel partition discovery
+   * discovery threshold.
+   *
+   * This is publicly visible for testing.
+   */
+  def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
+    val output = mutable.LinkedHashSet[FileStatus]()
+    val pathsToFetch = mutable.ArrayBuffer[Path]()
+    for (path <- paths) {
+      fileStatusCache.getLeafFiles(path) match {
+        case Some(files) =>
+          HiveCatalogMetrics.incrementFileCacheHits(files.length)
+          output ++= files
+        case None =>
+          pathsToFetch += path
+      }
+    }
+    val discovered = if (pathsToFetch.length >=
+        sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+      PartitioningAwareFileIndex.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession)
+    } else {
+      PartitioningAwareFileIndex.listLeafFilesInSerial(pathsToFetch, hadoopConf)
+    }
+    discovered.foreach { case (path, leafFiles) =>
+      HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
+      fileStatusCache.putLeafFiles(path, leafFiles.toArray)
+      output ++= leafFiles
+    }
+    output
+  }
+}
+
+object PartitioningAwareFileIndex extends Logging {
+  val BASE_PATH_PARAM = "basePath"
+
+  /** A serializable variant of HDFS's BlockLocation. */
+  private case class SerializableBlockLocation(
+      names: Array[String],
+      hosts: Array[String],
+      offset: Long,
+      length: Long)
+
+  /** A serializable variant of HDFS's FileStatus. */
+  private case class SerializableFileStatus(
+      path: String,
+      length: Long,
+      isDir: Boolean,
+      blockReplication: Short,
+      blockSize: Long,
+      modificationTime: Long,
+      accessTime: Long,
+      blockLocations: Array[SerializableBlockLocation])
+
+  /**
+   * List a collection of path recursively.
+   */
+  private def listLeafFilesInSerial(
+      paths: Seq[Path],
+      hadoopConf: Configuration): Seq[(Path, Seq[FileStatus])] = {
+    // Dummy jobconf to get to the pathFilter defined in configuration
+    val jobConf = new JobConf(hadoopConf, this.getClass)
+    val filter = FileInputFormat.getInputPathFilter(jobConf)
+
+    paths.map { path =>
+      val fs = path.getFileSystem(hadoopConf)
+      (path, listLeafFiles0(fs, path, filter))
+    }
+  }
+
+  /**
+   * List a collection of path recursively in parallel (using Spark executors).
+   * Each task launched will use [[listLeafFilesInSerial]] to list.
+   */
+  private def listLeafFilesInParallel(
+      paths: Seq[Path],
+      hadoopConf: Configuration,
+      sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
+    assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
+    logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
+
+    val sparkContext = sparkSession.sparkContext
+    val serializableConfiguration = new SerializableConfiguration(hadoopConf)
+    val serializedPaths = paths.map(_.toString)
+
+    // Set the number of parallelism to prevent following file listing from generating many tasks
+    // in case of large #defaultParallelism.
+    val numParallelism = Math.min(paths.size, 10000)
+
+    val statusMap = sparkContext
+      .parallelize(serializedPaths, numParallelism)
+      .mapPartitions { paths =>
+        val hadoopConf = serializableConfiguration.value
+        listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
+      }.map { case (path, statuses) =>
+        val serializableStatuses = statuses.map { status =>
+          // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
+          val blockLocations = status match {
+            case f: LocatedFileStatus =>
+              f.getBlockLocations.map { loc =>
+                SerializableBlockLocation(
+                  loc.getNames,
+                  loc.getHosts,
+                  loc.getOffset,
+                  loc.getLength)
+              }
+
+            case _ =>
+              Array.empty[SerializableBlockLocation]
+          }
+
+          SerializableFileStatus(
+            status.getPath.toString,
+            status.getLen,
+            status.isDirectory,
+            status.getReplication,
+            status.getBlockSize,
+            status.getModificationTime,
+            status.getAccessTime,
+            blockLocations)
+        }
+        (path.toString, serializableStatuses)
+      }.collect()
+
+    // turn SerializableFileStatus back to Status
+    statusMap.map { case (path, serializableStatuses) =>
+      val statuses = serializableStatuses.map { f =>
+        val blockLocations = f.blockLocations.map { loc =>
+          new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
+        }
+        new LocatedFileStatus(
+          new FileStatus(
+            f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime,
+            new Path(f.path)),
+          blockLocations)
+      }
+      (new Path(path), statuses)
+    }
+  }
+
+  /**
+   * List a single path, provided as a FileStatus, in serial.
+   */
+  private def listLeafFiles0(
+      fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
+    logTrace(s"Listing $path")
+    val name = path.getName.toLowerCase
+    if (shouldFilterOut(name)) {
+      Seq.empty[FileStatus]
+    } else {
+      // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist
+      // Note that statuses only include FileStatus for the files and dirs directly under path,
+      // and does not include anything else recursively.
+      val statuses = try fs.listStatus(path) catch {
+        case _: FileNotFoundException =>
+          logWarning(s"The directory $path was not found. Was it deleted very recently?")
+          Array.empty[FileStatus]
+      }
+
+      val allLeafStatuses = {
+        val (dirs, files) = statuses.partition(_.isDirectory)
+        val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
+        if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
+      }
+
+      allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
+        case f: LocatedFileStatus =>
+          f
+
+        // NOTE:
+        //
+        // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
+        //   operations, calling `getFileBlockLocations` does no harm here since these file system
+        //   implementations don't actually issue RPC for this method.
+        //
+        // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
+        //   be a big deal since we always use to `listLeafFilesInParallel` when the number of
+        //   paths exceeds threshold.
+        case f =>
+          // The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
+          // which is very slow on some file system (RawLocalFileSystem, which is launch a
+          // subprocess and parse the stdout).
+          val locations = fs.getFileBlockLocations(f, 0, f.getLen)
+          val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
+            f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
+          if (f.isSymlink) {
+            lfs.setSymlink(f.getSymlink)
+          }
+          lfs
+      }
+    }
+  }
+
+  /** Checks if we should filter out this path name. */
+  def shouldFilterOut(pathName: String): Boolean = {
+    // We filter everything that starts with _ and ., except _common_metadata and _metadata
+    // because Parquet needs to find those metadata files from leaf files returned by this method.
+    // We should refactor this logic to not mix metadata files with data files.
+    ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) &&
+      !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
index 8689017..8566a80 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -28,7 +28,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
         logicalRelation @
           LogicalRelation(fsRelation @
             HadoopFsRelation(
-              tableFileCatalog: TableFileCatalog,
+              catalogFileIndex: CatalogFileIndex,
               partitionSchema,
               _,
               _,
@@ -56,9 +56,9 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
         ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
 
       if (partitionKeyFilters.nonEmpty) {
-        val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq)
+        val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)
         val prunedFsRelation =
-          fsRelation.copy(location = prunedFileCatalog)(sparkSession)
+          fsRelation.copy(location = prunedFileIndex)(sparkSession)
         val prunedLogicalRelation = logicalRelation.copy(
           relation = prunedFsRelation,
           expectedOutputAttributes = Some(logicalRelation.output))

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
deleted file mode 100644
index b459df5..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.hadoop.fs.Path
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.StructType
-
-
-/**
- * A [[FileCatalog]] for a metastore catalog table.
- *
- * @param sparkSession a [[SparkSession]]
- * @param table the metadata of the table
- * @param sizeInBytes the table's data size in bytes
- */
-class TableFileCatalog(
-    sparkSession: SparkSession,
-    val table: CatalogTable,
-    override val sizeInBytes: Long) extends FileCatalog {
-
-  protected val hadoopConf = sparkSession.sessionState.newHadoopConf
-
-  private val fileStatusCache = FileStatusCache.newCache(sparkSession)
-
-  assert(table.identifier.database.isDefined,
-    "The table identifier must be qualified in TableFileCatalog")
-
-  private val baseLocation = table.storage.locationUri
-
-  override def partitionSchema: StructType = table.partitionSchema
-
-  override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
-
-  override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
-    filterPartitions(filters).listFiles(Nil)
-  }
-
-  override def refresh(): Unit = fileStatusCache.invalidateAll()
-
-  /**
-   * Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions
-   * specified by the given partition-pruning filters.
-   *
-   * @param filters partition-pruning filters
-   */
-  def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
-    if (table.partitionColumnNames.nonEmpty) {
-      val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
-        table.identifier, filters)
-      val partitions = selectedPartitions.map { p =>
-        PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get)
-      }
-      val partitionSpec = PartitionSpec(partitionSchema, partitions)
-      new PrunedTableFileCatalog(
-        sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
-    } else {
-      new ListingFileCatalog(sparkSession, rootPaths, table.storage.properties, None)
-    }
-  }
-
-  override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles
-
-  // `TableFileCatalog` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member
-  // of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to
-  // implement `equals` and `hashCode` here, to make it work with cache lookup.
-  override def equals(o: Any): Boolean = o match {
-    case other: TableFileCatalog => this.table.identifier == other.table.identifier
-    case _ => false
-  }
-
-  override def hashCode(): Int = table.identifier.hashCode()
-}
-
-/**
- * An override of the standard HDFS listing based catalog, that overrides the partition spec with
- * the information from the metastore.
- *
- * @param tableBasePath The default base path of the Hive metastore table
- * @param partitionSpec The partition specifications from Hive metastore
- */
-private class PrunedTableFileCatalog(
-    sparkSession: SparkSession,
-    tableBasePath: Path,
-    fileStatusCache: FileStatusCache,
-    override val partitionSpec: PartitionSpec)
-  extends ListingFileCatalog(
-    sparkSession,
-    partitionSpec.partitions.map(_.path),
-    Map.empty,
-    Some(partitionSpec.partitionColumns),
-    fileStatusCache)

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index c14feea..b26edee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -146,7 +146,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
    */
   def allFiles(): Array[T] = {
     var latestId = getLatest().map(_._1).getOrElse(-1L)
-    // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileCatalog`
+    // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileIndex`
     // is calling this method. This loop will retry the reading to deal with the
     // race condition.
     while (true) {
@@ -158,7 +158,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
         } catch {
           case e: IOException =>
             // Another process using `CompactibleFileStreamLog` may delete the batch files when
-            // `StreamFileCatalog` are reading. However, it only happens when a compaction is
+            // `StreamFileIndex` are reading. However, it only happens when a compaction is
             // deleting old files. If so, let's try the next compaction batch and we should find it.
             // Otherwise, this is a real IO issue and we should throw it.
             latestId = nextCompactionBatchId(latestId, compactInterval)

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index a392b82..680df01 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.execution.datasources.{DataSource, ListingFileCatalog, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation}
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -156,7 +156,7 @@ class FileStreamSource(
   private def fetchAllFiles(): Seq[(String, Long)] = {
     val startTime = System.nanoTime
     val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
-    val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType))
+    val catalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType))
     val files = catalog.allFiles().sortBy(_.getModificationTime).map { status =>
       (status.getPath.toUri.toString, status.getModificationTime)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
index 82b67cb..aeaa134 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
@@ -26,11 +26,11 @@ import org.apache.spark.sql.execution.datasources._
 
 
 /**
- * A [[FileCatalog]] that generates the list of files to processing by reading them from the
+ * A [[FileIndex]] that generates the list of files to processing by reading them from the
  * metadata log files generated by the [[FileStreamSink]].
  */
-class MetadataLogFileCatalog(sparkSession: SparkSession, path: Path)
-  extends PartitioningAwareFileCatalog(sparkSession, Map.empty, None) {
+class MetadataLogFileIndex(sparkSession: SparkSession, path: Path)
+  extends PartitioningAwareFileIndex(sparkSession, Map.empty, None) {
 
   private val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
   logInfo(s"Reading streaming file log from $metadataDirectory")

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
index 9c43169..56df1fa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
@@ -28,15 +28,15 @@ import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.test.SharedSQLContext
 
-class FileCatalogSuite extends SharedSQLContext {
+class FileIndexSuite extends SharedSQLContext {
 
-  test("ListingFileCatalog: leaf files are qualified paths") {
+  test("InMemoryFileIndex: leaf files are qualified paths") {
     withTempDir { dir =>
       val file = new File(dir, "text.txt")
       stringToFile(file, "text")
 
       val path = new Path(file.getCanonicalPath)
-      val catalog = new ListingFileCatalog(spark, Seq(path), Map.empty, None) {
+      val catalog = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) {
         def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq
         def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq
       }
@@ -45,7 +45,7 @@ class FileCatalogSuite extends SharedSQLContext {
     }
   }
 
-  test("ListingFileCatalog: input paths are converted to qualified paths") {
+  test("InMemoryFileIndex: input paths are converted to qualified paths") {
     withTempDir { dir =>
       val file = new File(dir, "text.txt")
       stringToFile(file, "text")
@@ -59,42 +59,42 @@ class FileCatalogSuite extends SharedSQLContext {
       val qualifiedFilePath = fs.makeQualified(new Path(file.getCanonicalPath))
       require(qualifiedFilePath.toString.startsWith("file:"))
 
-      val catalog1 = new ListingFileCatalog(
+      val catalog1 = new InMemoryFileIndex(
         spark, Seq(unqualifiedDirPath), Map.empty, None)
       assert(catalog1.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
 
-      val catalog2 = new ListingFileCatalog(
+      val catalog2 = new InMemoryFileIndex(
         spark, Seq(unqualifiedFilePath), Map.empty, None)
       assert(catalog2.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
 
     }
   }
 
-  test("ListingFileCatalog: folders that don't exist don't throw exceptions") {
+  test("InMemoryFileIndex: folders that don't exist don't throw exceptions") {
     withTempDir { dir =>
       val deletedFolder = new File(dir, "deleted")
       assert(!deletedFolder.exists())
-      val catalog1 = new ListingFileCatalog(
+      val catalog1 = new InMemoryFileIndex(
         spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None)
       // doesn't throw an exception
       assert(catalog1.listLeafFiles(catalog1.rootPaths).isEmpty)
     }
   }
 
-  test("PartitioningAwareFileCatalog - file filtering") {
-    assert(!PartitioningAwareFileCatalog.shouldFilterOut("abcd"))
-    assert(PartitioningAwareFileCatalog.shouldFilterOut(".ab"))
-    assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd"))
-    assert(!PartitioningAwareFileCatalog.shouldFilterOut("_metadata"))
-    assert(!PartitioningAwareFileCatalog.shouldFilterOut("_common_metadata"))
-    assert(PartitioningAwareFileCatalog.shouldFilterOut("_ab_metadata"))
-    assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd_common_metadata"))
+  test("PartitioningAwareFileIndex - file filtering") {
+    assert(!PartitioningAwareFileIndex.shouldFilterOut("abcd"))
+    assert(PartitioningAwareFileIndex.shouldFilterOut(".ab"))
+    assert(PartitioningAwareFileIndex.shouldFilterOut("_cd"))
+    assert(!PartitioningAwareFileIndex.shouldFilterOut("_metadata"))
+    assert(!PartitioningAwareFileIndex.shouldFilterOut("_common_metadata"))
+    assert(PartitioningAwareFileIndex.shouldFilterOut("_ab_metadata"))
+    assert(PartitioningAwareFileIndex.shouldFilterOut("_cd_common_metadata"))
   }
 
-  test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") {
+  test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") {
     class MockCatalog(
       override val rootPaths: Seq[Path])
-      extends PartitioningAwareFileCatalog(spark, Map.empty, None) {
+      extends PartitioningAwareFileIndex(spark, Map.empty, None) {
 
       override def refresh(): Unit = {}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index c32254d..d900ce7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -393,7 +393,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
           util.stringToFile(file, fileName)
         }
 
-        val fileCatalog = new ListingFileCatalog(
+        val fileCatalog = new InMemoryFileIndex(
           sparkSession = spark,
           rootPaths = Seq(new Path(tempDir)),
           parameters = Map.empty[String, String],

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index f2a209e..120a3a2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -634,7 +634,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
       val queryExecution = spark.read.parquet(dir.getCanonicalPath).queryExecution
       queryExecution.analyzed.collectFirst {
         case LogicalRelation(
-            HadoopFsRelation(location: PartitioningAwareFileCatalog, _, _, _, _, _), _, _) =>
+            HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), _, _) =>
           assert(location.partitionSpec() === PartitionSpec.emptySpec)
       }.getOrElse {
         fail(s"Expecting a matching HadoopFsRelation, but got:\n$queryExecution")

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 19c89f5..18b42a8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream, MetadataLogFileCatalog}
+import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream, MetadataLogFileIndex}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
@@ -179,14 +179,14 @@ class FileStreamSinkSuite extends StreamTest {
         .add(StructField("id", IntegerType))
       assert(outputDf.schema === expectedSchema)
 
-      // Verify that MetadataLogFileCatalog is being used and the correct partitioning schema has
+      // Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
       // been inferred
       val hadoopdFsRelations = outputDf.queryExecution.analyzed.collect {
         case LogicalRelation(baseRelation, _, _) if baseRelation.isInstanceOf[HadoopFsRelation] =>
           baseRelation.asInstanceOf[HadoopFsRelation]
       }
       assert(hadoopdFsRelations.size === 1)
-      assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileCatalog])
+      assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex])
       assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id"))
       assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value"))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/90d3b91f/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index b9e9da9..47018b3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -879,7 +879,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     val numFiles = 10000
 
     // This is to avoid running a spark job to list of files in parallel
-    // by the ListingFileCatalog.
+    // by the InMemoryFileIndex.
     spark.sessionState.conf.setConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles * 2)
 
     withTempDirs { case (root, tmp) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org