You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/10/15 01:26:18 UTC

[2/2] spark git commit: [SPARK-16980][SQL] Load only catalog table partition metadata required to answer a query

[SPARK-16980][SQL] Load only catalog table partition metadata required to answer a query

(This PR addresses https://issues.apache.org/jira/browse/SPARK-16980.)

## What changes were proposed in this pull request?

In a new Spark session, when a partitioned Hive table is converted to use Spark's `HadoopFsRelation` in `HiveMetastoreCatalog`, metadata for every partition of that table are retrieved from the metastore and loaded into driver memory. In addition, every partition's metadata files are read from the filesystem to perform schema inference.

If a user queries such a table with predicates which prune that table's partitions, we would like to be able to answer that query without consulting partition metadata which are not involved in the query. When querying a table with a large number of partitions for some data from a small number of partitions (maybe even a single partition), the current conversion strategy is highly inefficient. I suspect this scenario is not uncommon in the wild.

In addition to being inefficient in running time, the current strategy is inefficient in its use of driver memory. When the sum of the number of partitions of all tables loaded in a driver reaches a certain level (somewhere in the tens of thousands), their cached data exhaust all driver heap memory in the default configuration. I suspect this scenario is less common (in that not too many deployments work with tables with tens of thousands of partitions), however this does illustrate how large the memory footprint of this metadata can be. With tables with hundreds or thousands of partitions, I would expect the `HiveMetastoreCatalog` table cache to represent a significant portion of the driver's heap space.

This PR proposes an alternative approach. Basically, it makes four changes:

1. It adds a new method, `listPartitionsByFilter` to the Catalyst `ExternalCatalog` trait which returns the partition metadata for a given sequence of partition pruning predicates.
1. It refactors the `FileCatalog` type hierarchy to include a new `TableFileCatalog` to efficiently return files only for partitions matching a sequence of partition pruning predicates.
1. It removes partition loading and caching from `HiveMetastoreCatalog`.
1. It adds a new Catalyst optimizer rule, `PruneFileSourcePartitions`, which applies a plan's partition-pruning predicates to prune out unnecessary partition files from a `HadoopFsRelation`'s underlying file catalog.

The net effect is that when a query over a partitioned Hive table is planned, the analyzer retrieves the table metadata from `HiveMetastoreCatalog`. As part of this operation, the `HiveMetastoreCatalog` builds a `HadoopFsRelation` with a `TableFileCatalog`. It does not load any partition metadata or scan any files. The optimizer prunes-away unnecessary table partitions by sending the partition-pruning predicates to the relation's `TableFileCatalog `. The `TableFileCatalog` in turn calls the `listPartitionsByFilter` method on its external catalog. This queries the Hive metastore, passing along those filters.

As a bonus, performing partition pruning during optimization leads to a more accurate relation size estimate. This, along with c481bdf, can lead to automatic, safe application of the broadcast optimization in a join where it might previously have been omitted.

## Open Issues

1. This PR omits partition metadata caching. I can add this once the overall strategy for the cold path is established, perhaps in a future PR.
1. This PR removes and omits partitioned Hive table schema reconciliation. As a result, it fails to find Parquet schema columns with upper case letters because of the Hive metastore's case-insensitivity. This issue may be fixed by #14750, but that PR appears to have stalled. ericl has contributed to this PR a workaround for Parquet wherein schema reconciliation occurs at query execution time instead of planning. Whether ORC requires a similar patch is an open issue.
1. This PR omits an implementation of `listPartitionsByFilter` for the `InMemoryCatalog`.
1. This PR breaks parquet log output redirection during query execution. I can work around this by running `Class.forName("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$")` first thing in a Spark shell session, but I haven't figured out how to fix this properly.

## How was this patch tested?

The current Spark unit tests were run, and some ad-hoc tests were performed to validate that only the necessary partition metadata is loaded.

Author: Michael Allman <mi...@videoamp.com>
Author: Eric Liang <ek...@databricks.com>
Author: Eric Liang <ek...@gmail.com>

Closes #14690 from mallman/spark-16980-lazy_partition_fetching.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ce1b675
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ce1b675
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ce1b675

Branch: refs/heads/master
Commit: 6ce1b675ee9fc9a6034439c3ca00441f9f172f84
Parents: 2d96d35
Author: Michael Allman <mi...@videoamp.com>
Authored: Fri Oct 14 18:26:18 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Fri Oct 14 18:26:18 2016 -0700

----------------------------------------------------------------------
 .../spark/metrics/source/StaticSources.scala    |  34 ++-
 .../sql/catalyst/catalog/ExternalCatalog.scala  |   5 +-
 .../sql/catalyst/catalog/InMemoryCatalog.scala  |   4 +-
 .../spark/sql/catalyst/catalog/interface.scala  |  15 +-
 .../scala/org/apache/spark/sql/Dataset.scala    |   4 +-
 .../spark/sql/execution/CacheManager.scala      |   2 +-
 .../sql/execution/DataSourceScanExec.scala      |  28 ++-
 .../spark/sql/execution/SparkOptimizer.scala    |   2 +
 .../command/createDataSourceTables.scala        |   2 +-
 .../sql/execution/datasources/DataSource.scala  |   4 +-
 .../datasources/DataSourceStrategy.scala        |   8 +-
 .../sql/execution/datasources/FileFormat.scala  |  46 +++-
 .../datasources/HadoopFsRelation.scala          |  16 +-
 .../datasources/ListingFileCatalog.scala        | 197 +---------------
 .../execution/datasources/LogicalRelation.scala |   2 +-
 .../PartitioningAwareFileCatalog.scala          |  24 +-
 .../datasources/PruneFileSourcePartitions.scala |  72 ++++++
 .../datasources/SessionFileCatalog.scala        | 225 +++++++++++++++++++
 .../datasources/TableFileCatalog.scala          | 113 ++++++++++
 .../parquet/ParquetReadSupport.scala            |   6 +-
 .../streaming/MetadataLogFileCatalog.scala      |   2 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |   9 +
 .../datasources/FileCatalogSuite.scala          |   5 +-
 .../datasources/FileSourceStrategySuite.scala   |   2 +-
 .../datasources/ListingFileCatalogSuite.scala   |  34 ---
 .../datasources/SessionFileCatalogSuite.scala   |  34 +++
 .../ParquetPartitionDiscoverySuite.scala        |   6 +-
 .../parquet/ParquetSchemaSuite.scala            |  28 +++
 .../spark/sql/hive/HiveExternalCatalog.scala    |  37 ++-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 126 ++++-------
 .../spark/sql/hive/client/HiveClient.scala      |  15 +-
 .../spark/sql/hive/client/HiveClientImpl.scala  |  19 +-
 .../spark/sql/hive/orc/OrcFileFormat.scala      |  12 +-
 .../spark/sql/hive/HiveDataFrameSuite.scala     | 109 ++++++++-
 .../spark/sql/hive/HiveMetadataCacheSuite.scala |  41 ++++
 .../spark/sql/hive/client/VersionsSuite.scala   |   4 +-
 .../spark/sql/hive/orc/OrcQuerySuite.scala      |  22 ++
 .../apache/spark/sql/hive/parquetSuites.scala   |  20 +-
 38 files changed, 940 insertions(+), 394 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
index 6bba259..cf92a10 100644
--- a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
@@ -26,7 +26,7 @@ private[spark] object StaticSources {
    * The set of all static sources. These sources may be reported to from any class, including
    * static classes, without requiring reference to a SparkEnv.
    */
-  val allSources = Seq(CodegenMetrics)
+  val allSources = Seq(CodegenMetrics, HiveCatalogMetrics)
 }
 
 /**
@@ -60,3 +60,35 @@ object CodegenMetrics extends Source {
   val METRIC_GENERATED_METHOD_BYTECODE_SIZE =
     metricRegistry.histogram(MetricRegistry.name("generatedMethodSize"))
 }
+
+/**
+ * :: Experimental ::
+ * Metrics for access to the hive external catalog.
+ */
+@Experimental
+object HiveCatalogMetrics extends Source {
+  override val sourceName: String = "HiveExternalCatalog"
+  override val metricRegistry: MetricRegistry = new MetricRegistry()
+
+  /**
+   * Tracks the total number of partition metadata entries fetched via the client api.
+   */
+  val METRIC_PARTITIONS_FETCHED = metricRegistry.counter(MetricRegistry.name("partitionsFetched"))
+
+  /**
+   * Tracks the total number of files discovered off of the filesystem by ListingFileCatalog.
+   */
+  val METRIC_FILES_DISCOVERED = metricRegistry.counter(MetricRegistry.name("filesDiscovered"))
+
+  /**
+   * Resets the values of all metrics to zero. This is useful in tests.
+   */
+  def reset(): Unit = {
+    METRIC_PARTITIONS_FETCHED.dec(METRIC_PARTITIONS_FETCHED.getCount())
+    METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
+  }
+
+  // clients can use these to avoid classloader issues with the codahale classes
+  def incrementFetchedPartitions(n: Int): Unit = METRIC_PARTITIONS_FETCHED.inc(n)
+  def incrementFilesDiscovered(n: Int): Unit = METRIC_FILES_DISCOVERED.inc(n)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 348d3d0..a5e0252 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -198,11 +198,12 @@ abstract class ExternalCatalog {
       partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition]
 
   /**
-   * List the metadata of selected partitions according to the given partition predicates.
+   * List the metadata of partitions that belong to the specified table, assuming it exists, that
+   * satisfy the given partition-pruning predicate expressions.
    *
    * @param db database name
    * @param table table name
-   * @param predicates partition predicated
+   * @param predicates  partition-pruning predicates
    */
   def listPartitionsByFilter(
       db: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 49280f8..f95c9f8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -482,7 +482,9 @@ class InMemoryCatalog(
       db: String,
       table: String,
       predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
-    throw new UnsupportedOperationException("listPartitionsByFilter is not implemented.")
+    // TODO: Provide an implementation
+    throw new UnsupportedOperationException(
+      "listPartitionsByFilter is not implemented for InMemoryCatalog")
   }
 
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 51326ca..1a57a77 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -20,11 +20,11 @@ package org.apache.spark.sql.catalyst.catalog
 import java.util.Date
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StructField, StructType}
 
 
 /**
@@ -97,6 +97,15 @@ case class CatalogTablePartition(
 
     output.filter(_.nonEmpty).mkString("CatalogPartition(\n\t", "\n\t", ")")
   }
+
+  /**
+   * Given the partition schema, returns a row with that schema holding the partition values.
+   */
+  def toRow(partitionSchema: StructType): InternalRow = {
+    InternalRow.fromSeq(partitionSchema.map { case StructField(name, dataType, _, _) =>
+      Cast(Literal(spec(name)), dataType).eval()
+    })
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 7ae3275..7dccbbd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.usePrettyExpression
 import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
 import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.datasources.{FileCatalog, HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
 import org.apache.spark.sql.execution.python.EvaluatePython
 import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery}
@@ -2614,7 +2614,7 @@ class Dataset[T] private[sql](
    * @since 2.0.0
    */
   def inputFiles: Array[String] = {
-    val files: Seq[String] = logicalPlan.collect {
+    val files: Seq[String] = queryExecution.optimizedPlan.collect {
       case LogicalRelation(fsBasedRelation: FileRelation, _, _) =>
         fsBasedRelation.inputFiles
       case fr: FileRelation =>

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 83b7c77..92fd366 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -185,7 +185,7 @@ class CacheManager extends Logging {
     plan match {
       case lr: LogicalRelation => lr.relation match {
         case hr: HadoopFsRelation =>
-          val invalidate = hr.location.paths
+          val invalidate = hr.location.rootPaths
             .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory))
             .contains(qualifiedPath)
           if (invalidate) hr.location.refresh()

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 6cdba40..623d2be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -225,13 +225,27 @@ case class FileSourceScanExec(
   }
 
   // These metadata values make scan plans uniquely identifiable for equality checking.
-  override val metadata: Map[String, String] = Map(
-    "Format" -> relation.fileFormat.toString,
-    "ReadSchema" -> outputSchema.catalogString,
-    "Batched" -> supportsBatch.toString,
-    "PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"),
-    "PushedFilters" -> dataFilters.mkString("[", ", ", "]"),
-    "InputPaths" -> relation.location.paths.mkString(", "))
+  override val metadata: Map[String, String] = {
+    def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
+    val location = relation.location
+    val locationDesc =
+      location.getClass.getSimpleName + seqToString(location.rootPaths)
+    val metadata =
+      Map(
+        "Format" -> relation.fileFormat.toString,
+        "ReadSchema" -> outputSchema.catalogString,
+        "Batched" -> supportsBatch.toString,
+        "PartitionFilters" -> seqToString(partitionFilters),
+        "PushedFilters" -> seqToString(dataFilters),
+        "Location" -> locationDesc)
+    val withOptPartitionCount =
+      relation.partitionSchemaOption.map { _ =>
+        metadata + ("PartitionCount" -> selectedPartitions.size.toString)
+      } getOrElse {
+        metadata
+      }
+    withOptPartitionCount
+  }
 
   private lazy val inputRDD: RDD[InternalRow] = {
     val readFile: (PartitionedFile) => Iterator[InternalRow] =

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index 8b762b5..9817283 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
 import org.apache.spark.sql.ExperimentalMethods
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
 import org.apache.spark.sql.execution.python.ExtractPythonUDFFromAggregate
 import org.apache.spark.sql.internal.SQLConf
 
@@ -32,5 +33,6 @@ class SparkOptimizer(
   override def batches: Seq[Batch] = super.batches :+
     Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog, conf)) :+
     Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+
+    Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions) :+
     Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index a04a13e..a8c75a7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -67,7 +67,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
 
     dataSource match {
       case fs: HadoopFsRelation =>
-        if (table.tableType == CatalogTableType.EXTERNAL && fs.location.paths.isEmpty) {
+        if (table.tableType == CatalogTableType.EXTERNAL && fs.location.rootPaths.isEmpty) {
           throw new AnalysisException(
             "Cannot create a file-based external data source table without path")
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index e75e7d2..92b1fff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -471,9 +471,7 @@ case class DataSource(
           val existingPartitionColumns = Try {
             resolveRelation()
               .asInstanceOf[HadoopFsRelation]
-              .location
-              .partitionSpec()
-              .partitionColumns
+              .partitionSchema
               .fieldNames
               .toSeq
           }.getOrElse(Seq.empty[String])

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 6f9ed50..7d0abe8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -163,14 +163,14 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
         if query.resolved && t.schema.asNullable == query.schema.asNullable =>
 
       // Sanity checks
-      if (t.location.paths.size != 1) {
+      if (t.location.rootPaths.size != 1) {
         throw new AnalysisException(
           "Can only write data to relations with a single path.")
       }
 
-      val outputPath = t.location.paths.head
+      val outputPath = t.location.rootPaths.head
       val inputPaths = query.collect {
-        case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.paths
+        case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.rootPaths
       }.flatten
 
       val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
@@ -184,7 +184,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
         query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver),
         t.bucketSpec,
         t.fileFormat,
-        () => t.refresh(),
+        () => t.location.refresh(),
         t.options,
         query,
         mode)

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index bde2d2b..e7239ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 
+
 /**
  * Used to read and write data stored in files to/from the [[InternalRow]] format.
  */
@@ -182,16 +183,17 @@ abstract class TextBasedFileFormat extends FileFormat {
 case class Partition(values: InternalRow, files: Seq[FileStatus])
 
 /**
- * An interface for objects capable of enumerating the files that comprise a relation as well
- * as the partitioning characteristics of those files.
+ * An interface for objects capable of enumerating the root paths of a relation as well as the
+ * partitions of a relation subject to some pruning expressions.
  */
-trait FileCatalog {
-
-  /** Returns the list of input paths from which the catalog will get files. */
-  def paths: Seq[Path]
+trait BasicFileCatalog {
 
-  /** Returns the specification of the partitions inferred from the data. */
-  def partitionSpec(): PartitionSpec
+  /**
+   * Returns the list of root input paths from which the catalog will get files. There may be a
+   * single root path from which partitions are discovered, or individual partitions may be
+   * specified by each path.
+   */
+  def rootPaths: Seq[Path]
 
   /**
    * Returns all valid files grouped into partitions when the data is partitioned. If the data is
@@ -204,9 +206,33 @@ trait FileCatalog {
    */
   def listFiles(filters: Seq[Expression]): Seq[Partition]
 
+  /** Returns the list of files that will be read when scanning this relation. */
+  def inputFiles: Array[String]
+
+  /** Refresh any cached file listings */
+  def refresh(): Unit
+
+  /** Sum of table file sizes, in bytes */
+  def sizeInBytes: Long
+}
+
+/**
+ * A [[BasicFileCatalog]] which can enumerate all of the files comprising a relation and, from
+ * those, infer the relation's partition specification.
+ */
+// TODO: Consider a more descriptive, appropriate name which suggests this is a file catalog for
+// which it is safe to list all of its files?
+trait FileCatalog extends BasicFileCatalog {
+
+  /** Returns the specification of the partitions inferred from the data. */
+  def partitionSpec(): PartitionSpec
+
   /** Returns all the valid files. */
   def allFiles(): Seq[FileStatus]
 
-  /** Refresh the file listing */
-  def refresh(): Unit
+  /** Returns the list of files that will be read when scanning this relation. */
+  override def inputFiles: Array[String] =
+    allFiles().map(_.getPath.toUri.toString).toArray
+
+  override def sizeInBytes: Long = allFiles().map(_.getLen).sum
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index c7ebe0b..db889ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -28,8 +28,8 @@ import org.apache.spark.sql.types.StructType
  * Acts as a container for all of the metadata required to read from a datasource. All discovery,
  * resolution and merging logic for schemas and partitions has been removed.
  *
- * @param location A [[FileCatalog]] that can enumerate the locations of all the files that comprise
- *                 this relation.
+ * @param location A [[BasicFileCatalog]] that can enumerate the locations of all the files that
+ *                 comprise this relation.
  * @param partitionSchema The schema of the columns (if any) that are used to partition the relation
  * @param dataSchema The schema of any remaining columns.  Note that if any partition columns are
  *                   present in the actual data files as well, they are preserved.
@@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType
  * @param options Configuration used when reading / writing data.
  */
 case class HadoopFsRelation(
-    location: FileCatalog,
+    location: BasicFileCatalog,
     partitionSchema: StructType,
     dataSchema: StructType,
     bucketSpec: Option[BucketSpec],
@@ -58,10 +58,6 @@ case class HadoopFsRelation(
   def partitionSchemaOption: Option[StructType] =
     if (partitionSchema.isEmpty) None else Some(partitionSchema)
 
-  def partitionSpec: PartitionSpec = location.partitionSpec()
-
-  def refresh(): Unit = location.refresh()
-
   override def toString: String = {
     fileFormat match {
       case source: DataSourceRegister => source.shortName()
@@ -69,9 +65,7 @@ case class HadoopFsRelation(
     }
   }
 
-  /** Returns the list of files that will be read when scanning this relation. */
-  override def inputFiles: Array[String] =
-    location.allFiles().map(_.getPath.toUri.toString).toArray
+  override def sizeInBytes: Long = location.sizeInBytes
 
-  override def sizeInBytes: Long = location.allFiles().map(_.getLen).sum
+  override def inputFiles: Array[String] = location.inputFiles
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index a68ae52..6d10501 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -17,32 +17,26 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import java.io.FileNotFoundException
-
 import scala.collection.mutable
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
-import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
-import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableConfiguration
 
 
 /**
  * A [[FileCatalog]] that generates the list of files to process by recursively listing all the
  * files present in `paths`.
  *
+ * @param rootPaths the list of root table paths to scan
  * @param parameters as set of options to control discovery
- * @param paths a list of paths to scan
  * @param partitionSchema an optional partition schema that will be use to provide types for the
  *                        discovered partitions
  */
 class ListingFileCatalog(
     sparkSession: SparkSession,
-    override val paths: Seq[Path],
+    override val rootPaths: Seq[Path],
     parameters: Map[String, String],
     partitionSchema: Option[StructType])
   extends PartitioningAwareFileCatalog(sparkSession, parameters, partitionSchema) {
@@ -70,198 +64,17 @@ class ListingFileCatalog(
   }
 
   override def refresh(): Unit = {
-    val files = listLeafFiles(paths)
+    val files = listLeafFiles(rootPaths)
     cachedLeafFiles =
       new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
     cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
     cachedPartitionSpec = null
   }
 
-  /**
-   * List leaf files of given paths. This method will submit a Spark job to do parallel
-   * listing whenever there is a path having more files than the parallel partition discovery
-   * discovery threshold.
-   *
-   * This is publicly visible for testing.
-   */
-  def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
-    val files =
-      if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
-        ListingFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
-      } else {
-        ListingFileCatalog.listLeafFilesInSerial(paths, hadoopConf)
-      }
-
-    mutable.LinkedHashSet(files: _*)
-  }
-
   override def equals(other: Any): Boolean = other match {
-    case hdfs: ListingFileCatalog => paths.toSet == hdfs.paths.toSet
+    case hdfs: ListingFileCatalog => rootPaths.toSet == hdfs.rootPaths.toSet
     case _ => false
   }
 
-  override def hashCode(): Int = paths.toSet.hashCode()
-}
-
-
-object ListingFileCatalog extends Logging {
-
-  /** A serializable variant of HDFS's BlockLocation. */
-  private case class SerializableBlockLocation(
-      names: Array[String],
-      hosts: Array[String],
-      offset: Long,
-      length: Long)
-
-  /** A serializable variant of HDFS's FileStatus. */
-  private case class SerializableFileStatus(
-      path: String,
-      length: Long,
-      isDir: Boolean,
-      blockReplication: Short,
-      blockSize: Long,
-      modificationTime: Long,
-      accessTime: Long,
-      blockLocations: Array[SerializableBlockLocation])
-
-  /**
-   * List a collection of path recursively.
-   */
-  private def listLeafFilesInSerial(
-      paths: Seq[Path],
-      hadoopConf: Configuration): Seq[FileStatus] = {
-    // Dummy jobconf to get to the pathFilter defined in configuration
-    val jobConf = new JobConf(hadoopConf, this.getClass)
-    val filter = FileInputFormat.getInputPathFilter(jobConf)
-
-    paths.flatMap { path =>
-      val fs = path.getFileSystem(hadoopConf)
-      listLeafFiles0(fs, path, filter)
-    }
-  }
-
-  /**
-   * List a collection of path recursively in parallel (using Spark executors).
-   * Each task launched will use [[listLeafFilesInSerial]] to list.
-   */
-  private def listLeafFilesInParallel(
-      paths: Seq[Path],
-      hadoopConf: Configuration,
-      sparkSession: SparkSession): Seq[FileStatus] = {
-    assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
-    logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
-
-    val sparkContext = sparkSession.sparkContext
-    val serializableConfiguration = new SerializableConfiguration(hadoopConf)
-    val serializedPaths = paths.map(_.toString)
-
-    // Set the number of parallelism to prevent following file listing from generating many tasks
-    // in case of large #defaultParallelism.
-    val numParallelism = Math.min(paths.size, 10000)
-
-    val statuses = sparkContext
-      .parallelize(serializedPaths, numParallelism)
-      .mapPartitions { paths =>
-        val hadoopConf = serializableConfiguration.value
-        listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
-      }.map { status =>
-        // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
-        val blockLocations = status match {
-          case f: LocatedFileStatus =>
-            f.getBlockLocations.map { loc =>
-              SerializableBlockLocation(
-                loc.getNames,
-                loc.getHosts,
-                loc.getOffset,
-                loc.getLength)
-            }
-
-          case _ =>
-            Array.empty[SerializableBlockLocation]
-        }
-
-        SerializableFileStatus(
-          status.getPath.toString,
-          status.getLen,
-          status.isDirectory,
-          status.getReplication,
-          status.getBlockSize,
-          status.getModificationTime,
-          status.getAccessTime,
-          blockLocations)
-      }.collect()
-
-    // Turn SerializableFileStatus back to Status
-    statuses.map { f =>
-      val blockLocations = f.blockLocations.map { loc =>
-        new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
-      }
-      new LocatedFileStatus(
-        new FileStatus(
-          f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)),
-        blockLocations)
-    }
-  }
-
-  /**
-   * List a single path, provided as a FileStatus, in serial.
-   */
-  private def listLeafFiles0(
-      fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
-    logTrace(s"Listing $path")
-    val name = path.getName.toLowerCase
-    if (shouldFilterOut(name)) {
-      Seq.empty[FileStatus]
-    } else {
-      // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist
-      // Note that statuses only include FileStatus for the files and dirs directly under path,
-      // and does not include anything else recursively.
-      val statuses = try fs.listStatus(path) catch {
-        case _: FileNotFoundException =>
-          logWarning(s"The directory $path was not found. Was it deleted very recently?")
-          Array.empty[FileStatus]
-      }
-
-      val allLeafStatuses = {
-        val (dirs, files) = statuses.partition(_.isDirectory)
-        val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
-        if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
-      }
-
-      allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
-        case f: LocatedFileStatus =>
-          f
-
-        // NOTE:
-        //
-        // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
-        //   operations, calling `getFileBlockLocations` does no harm here since these file system
-        //   implementations don't actually issue RPC for this method.
-        //
-        // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
-        //   be a big deal since we always use to `listLeafFilesInParallel` when the number of
-        //   paths exceeds threshold.
-        case f =>
-          // The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
-          // which is very slow on some file system (RawLocalFileSystem, which is launch a
-          // subprocess and parse the stdout).
-          val locations = fs.getFileBlockLocations(f, 0, f.getLen)
-          val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
-            f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
-          if (f.isSymlink) {
-            lfs.setSymlink(f.getSymlink)
-          }
-          lfs
-      }
-    }
-  }
-
-  /** Checks if we should filter out this path name. */
-  def shouldFilterOut(pathName: String): Boolean = {
-    // We filter everything that starts with _ and ., except _common_metadata and _metadata
-    // because Parquet needs to find those metadata files from leaf files returned by this method.
-    // We should refactor this logic to not mix metadata files with data files.
-    ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) &&
-      !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
-  }
+  override def hashCode(): Int = rootPaths.toSet.hashCode()
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index d9562fd..7c28d48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -94,7 +94,7 @@ case class LogicalRelation(
   }
 
   override def refresh(): Unit = relation match {
-    case fs: HadoopFsRelation => fs.refresh()
+    case fs: HadoopFsRelation => fs.location.refresh()
     case _ =>  // Do nothing.
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 702ba97..b250811 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -21,7 +21,6 @@ import scala.collection.mutable
 
 import org.apache.hadoop.fs.{FileStatus, Path}
 
-import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{expressions, InternalRow}
 import org.apache.spark.sql.catalyst.expressions._
@@ -40,9 +39,10 @@ abstract class PartitioningAwareFileCatalog(
     sparkSession: SparkSession,
     parameters: Map[String, String],
     partitionSchema: Option[StructType])
-  extends FileCatalog with Logging {
+  extends SessionFileCatalog(sparkSession) with FileCatalog {
+  import PartitioningAwareFileCatalog.BASE_PATH_PARAM
 
-  protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
+  override protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
 
   protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]
 
@@ -72,8 +72,8 @@ abstract class PartitioningAwareFileCatalog(
 
   override def allFiles(): Seq[FileStatus] = {
     if (partitionSpec().partitionColumns.isEmpty) {
-      // For each of the input paths, get the list of files inside them
-      paths.flatMap { path =>
+      // For each of the root input paths, get the list of files inside them
+      rootPaths.flatMap { path =>
         // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
         val fs = path.getFileSystem(hadoopConf)
         val qualifiedPathPre = fs.makeQualified(path)
@@ -105,8 +105,6 @@ abstract class PartitioningAwareFileCatalog(
   protected def inferPartitioning(): PartitionSpec = {
     // We use leaf dirs containing data files to discover the schema.
     val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
-      // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be
-      // counted as data files, so that they shouldn't participate partition discovery.
       files.exists(f => isDataPath(f.getPath))
     }.keys.toSeq
     partitionSchema match {
@@ -194,24 +192,30 @@ abstract class PartitioningAwareFileCatalog(
    * and the returned DataFrame will have the column of `something`.
    */
   private def basePaths: Set[Path] = {
-    parameters.get("basePath").map(new Path(_)) match {
+    parameters.get(BASE_PATH_PARAM).map(new Path(_)) match {
       case Some(userDefinedBasePath) =>
         val fs = userDefinedBasePath.getFileSystem(hadoopConf)
         if (!fs.isDirectory(userDefinedBasePath)) {
-          throw new IllegalArgumentException("Option 'basePath' must be a directory")
+          throw new IllegalArgumentException(s"Option '$BASE_PATH_PARAM' must be a directory")
         }
         Set(fs.makeQualified(userDefinedBasePath))
 
       case None =>
-        paths.map { path =>
+        rootPaths.map { path =>
           // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
           val qualifiedPath = path.getFileSystem(hadoopConf).makeQualified(path)
           if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else qualifiedPath }.toSet
     }
   }
 
+  // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be
+  // counted as data files, so that they shouldn't participate partition discovery.
   private def isDataPath(path: Path): Boolean = {
     val name = path.getName
     !((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
   }
 }
+
+object PartitioningAwareFileCatalog {
+  val BASE_PATH_PARAM = "basePath"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
new file mode 100644
index 0000000..29121a4
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+    case op @ PhysicalOperation(projects, filters,
+        logicalRelation @
+          LogicalRelation(fsRelation @
+            HadoopFsRelation(
+              tableFileCatalog: TableFileCatalog,
+              partitionSchema,
+              _,
+              _,
+              _,
+              _),
+            _,
+            _))
+        if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined =>
+      // The attribute name of predicate could be different than the one in schema in case of
+      // case insensitive, we should change them to match the one in schema, so we donot need to
+      // worry about case sensitivity anymore.
+      val normalizedFilters = filters.map { e =>
+        e transform {
+          case a: AttributeReference =>
+            a.withName(logicalRelation.output.find(_.semanticEquals(a)).get.name)
+        }
+      }
+
+      val sparkSession = fsRelation.sparkSession
+      val partitionColumns =
+        logicalRelation.resolve(
+          partitionSchema, sparkSession.sessionState.analyzer.resolver)
+      val partitionSet = AttributeSet(partitionColumns)
+      val partitionKeyFilters =
+        ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
+
+      if (partitionKeyFilters.nonEmpty) {
+        val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq)
+        val prunedFsRelation =
+          fsRelation.copy(location = prunedFileCatalog)(sparkSession)
+        val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation)
+
+        // Keep partition-pruning predicates so that they are visible in physical planning
+        val filterExpression = filters.reduceLeft(And)
+        val filter = Filter(filterExpression, prunedLogicalRelation)
+        Project(projects, filter)
+      } else {
+        op
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala
new file mode 100644
index 0000000..4807a92
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.FileNotFoundException
+
+import scala.collection.mutable
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.SerializableConfiguration
+
+
+/**
+ * A base class for [[BasicFileCatalog]]s that need a [[SparkSession]] and the ability to find leaf
+ * files in a list of HDFS paths.
+ *
+ * @param sparkSession a [[SparkSession]]
+ * @param ignoreFileNotFound (see [[ListingFileCatalog]])
+ */
+abstract class SessionFileCatalog(sparkSession: SparkSession)
+    extends BasicFileCatalog with Logging {
+  protected val hadoopConf: Configuration
+
+  /**
+   * List leaf files of given paths. This method will submit a Spark job to do parallel
+   * listing whenever there is a path having more files than the parallel partition discovery
+   * discovery threshold.
+   *
+   * This is publicly visible for testing.
+   */
+  def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
+    val files =
+      if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+        SessionFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
+      } else {
+        SessionFileCatalog.listLeafFilesInSerial(paths, hadoopConf)
+      }
+
+    HiveCatalogMetrics.incrementFilesDiscovered(files.size)
+    mutable.LinkedHashSet(files: _*)
+  }
+}
+
+object SessionFileCatalog extends Logging {
+
+  /** A serializable variant of HDFS's BlockLocation. */
+  private case class SerializableBlockLocation(
+      names: Array[String],
+      hosts: Array[String],
+      offset: Long,
+      length: Long)
+
+  /** A serializable variant of HDFS's FileStatus. */
+  private case class SerializableFileStatus(
+      path: String,
+      length: Long,
+      isDir: Boolean,
+      blockReplication: Short,
+      blockSize: Long,
+      modificationTime: Long,
+      accessTime: Long,
+      blockLocations: Array[SerializableBlockLocation])
+
+  /**
+   * List a collection of path recursively.
+   */
+  private def listLeafFilesInSerial(
+      paths: Seq[Path],
+      hadoopConf: Configuration): Seq[FileStatus] = {
+    // Dummy jobconf to get to the pathFilter defined in configuration
+    val jobConf = new JobConf(hadoopConf, this.getClass)
+    val filter = FileInputFormat.getInputPathFilter(jobConf)
+
+    paths.flatMap { path =>
+      val fs = path.getFileSystem(hadoopConf)
+      listLeafFiles0(fs, path, filter)
+    }
+  }
+
+  /**
+   * List a collection of path recursively in parallel (using Spark executors).
+   * Each task launched will use [[listLeafFilesInSerial]] to list.
+   */
+  private def listLeafFilesInParallel(
+      paths: Seq[Path],
+      hadoopConf: Configuration,
+      sparkSession: SparkSession): Seq[FileStatus] = {
+    assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
+    logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
+
+    val sparkContext = sparkSession.sparkContext
+    val serializableConfiguration = new SerializableConfiguration(hadoopConf)
+    val serializedPaths = paths.map(_.toString)
+
+    // Set the number of parallelism to prevent following file listing from generating many tasks
+    // in case of large #defaultParallelism.
+    val numParallelism = Math.min(paths.size, 10000)
+
+    val statuses = sparkContext
+      .parallelize(serializedPaths, numParallelism)
+      .mapPartitions { paths =>
+        val hadoopConf = serializableConfiguration.value
+        listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
+      }.map { status =>
+        // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
+        val blockLocations = status match {
+          case f: LocatedFileStatus =>
+            f.getBlockLocations.map { loc =>
+              SerializableBlockLocation(
+                loc.getNames,
+                loc.getHosts,
+                loc.getOffset,
+                loc.getLength)
+            }
+
+          case _ =>
+            Array.empty[SerializableBlockLocation]
+        }
+
+        SerializableFileStatus(
+          status.getPath.toString,
+          status.getLen,
+          status.isDirectory,
+          status.getReplication,
+          status.getBlockSize,
+          status.getModificationTime,
+          status.getAccessTime,
+          blockLocations)
+      }.collect()
+
+    // Turn SerializableFileStatus back to Status
+    statuses.map { f =>
+      val blockLocations = f.blockLocations.map { loc =>
+        new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
+      }
+      new LocatedFileStatus(
+        new FileStatus(
+          f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)),
+        blockLocations)
+    }
+  }
+
+  /**
+   * List a single path, provided as a FileStatus, in serial.
+   */
+  private def listLeafFiles0(
+      fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
+    logTrace(s"Listing $path")
+    val name = path.getName.toLowerCase
+    if (shouldFilterOut(name)) {
+      Seq.empty[FileStatus]
+    } else {
+      // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist
+      // Note that statuses only include FileStatus for the files and dirs directly under path,
+      // and does not include anything else recursively.
+      val statuses = try fs.listStatus(path) catch {
+        case _: FileNotFoundException =>
+          logWarning(s"The directory $path was not found. Was it deleted very recently?")
+          Array.empty[FileStatus]
+      }
+
+      val allLeafStatuses = {
+        val (dirs, files) = statuses.partition(_.isDirectory)
+        val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
+        if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
+      }
+
+      allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
+        case f: LocatedFileStatus =>
+          f
+
+        // NOTE:
+        //
+        // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
+        //   operations, calling `getFileBlockLocations` does no harm here since these file system
+        //   implementations don't actually issue RPC for this method.
+        //
+        // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
+        //   be a big deal since we always use to `listLeafFilesInParallel` when the number of
+        //   paths exceeds threshold.
+        case f =>
+          // The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
+          // which is very slow on some file system (RawLocalFileSystem, which is launch a
+          // subprocess and parse the stdout).
+          val locations = fs.getFileBlockLocations(f, 0, f.getLen)
+          val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
+            f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
+          if (f.isSymlink) {
+            lfs.setSymlink(f.getSymlink)
+          }
+          lfs
+      }
+    }
+  }
+
+  /** Checks if we should filter out this path name. */
+  def shouldFilterOut(pathName: String): Boolean = {
+    // We filter everything that starts with _ and ., except _common_metadata and _metadata
+    // because Parquet needs to find those metadata files from leaf files returned by this method.
+    // We should refactor this logic to not mix metadata files with data files.
+    ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) &&
+      !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
new file mode 100644
index 0000000..a5c41b2
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.StructType
+
+
+/**
+ * A [[BasicFileCatalog]] for a metastore catalog table.
+ *
+ * @param sparkSession a [[SparkSession]]
+ * @param db the table's database name
+ * @param table the table's (unqualified) name
+ * @param partitionSchema the schema of a partitioned table's partition columns
+ * @param sizeInBytes the table's data size in bytes
+ */
+class TableFileCatalog(
+    sparkSession: SparkSession,
+    db: String,
+    table: String,
+    partitionSchema: Option[StructType],
+    override val sizeInBytes: Long)
+  extends SessionFileCatalog(sparkSession) {
+
+  override protected val hadoopConf = sparkSession.sessionState.newHadoopConf
+
+  private val externalCatalog = sparkSession.sharedState.externalCatalog
+
+  private val catalogTable = externalCatalog.getTable(db, table)
+
+  private val baseLocation = catalogTable.storage.locationUri
+
+  override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
+
+  override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
+    filterPartitions(filters).listFiles(Nil)
+  }
+
+  override def refresh(): Unit = {}
+
+  /**
+   * Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions
+   * specified by the given partition-pruning filters.
+   *
+   * @param filters partition-pruning filters
+   */
+  def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
+    if (filters.isEmpty) {
+      cachedAllPartitions
+    } else {
+      filterPartitions0(filters)
+    }
+  }
+
+  private def filterPartitions0(filters: Seq[Expression]): ListingFileCatalog = {
+    val parameters = baseLocation
+      .map(loc => Map(PartitioningAwareFileCatalog.BASE_PATH_PARAM -> loc))
+      .getOrElse(Map.empty)
+    partitionSchema match {
+      case Some(schema) =>
+        val selectedPartitions = externalCatalog.listPartitionsByFilter(db, table, filters)
+        val partitions = selectedPartitions.map { p =>
+          PartitionDirectory(p.toRow(schema), p.storage.locationUri.get)
+        }
+        val partitionSpec = PartitionSpec(schema, partitions)
+        new PrunedTableFileCatalog(
+          sparkSession, new Path(baseLocation.get), partitionSpec)
+      case None =>
+        new ListingFileCatalog(sparkSession, rootPaths, parameters, None)
+    }
+  }
+
+  // Not used in the hot path of queries when metastore partition pruning is enabled
+  lazy val cachedAllPartitions: ListingFileCatalog = filterPartitions0(Nil)
+
+  override def inputFiles: Array[String] = cachedAllPartitions.inputFiles
+}
+
+/**
+ * An override of the standard HDFS listing based catalog, that overrides the partition spec with
+ * the information from the metastore.
+ *
+ * @param tableBasePath The default base path of the Hive metastore table
+ * @param partitionSpec The partition specifications from Hive metastore
+ */
+private class PrunedTableFileCatalog(
+    sparkSession: SparkSession,
+    tableBasePath: Path,
+    override val partitionSpec: PartitionSpec)
+  extends ListingFileCatalog(
+    sparkSession,
+    partitionSpec.partitions.map(_.path),
+    Map.empty,
+    Some(partitionSpec.partitionColumns))

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index f1a35dd..4dea8cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -269,11 +269,15 @@ private[parquet] object ParquetReadSupport {
    */
   private def clipParquetGroupFields(
       parquetRecord: GroupType, structType: StructType): Seq[Type] = {
-    val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
+    val parquetFieldMap = parquetRecord.getFields.asScala
+      .map(f => f.getName -> f).toMap
+    val caseInsensitiveParquetFieldMap = parquetRecord.getFields.asScala
+      .map(f => f.getName.toLowerCase -> f).toMap
     val toParquet = new ParquetSchemaConverter(writeLegacyParquetFormat = false)
     structType.map { f =>
       parquetFieldMap
         .get(f.name)
+        .orElse(caseInsensitiveParquetFieldMap.get(f.name.toLowerCase))
         .map(clipParquetType(_, f.dataType))
         .getOrElse(toParquet.convertField(f))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
index a32c467..82b67cb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
@@ -47,7 +47,7 @@ class MetadataLogFileCatalog(sparkSession: SparkSession, path: Path)
     allFilesFromLog.toArray.groupBy(_.getPath.getParent)
   }
 
-  override def paths: Seq[Path] = path :: Nil
+  override def rootPaths: Seq[Path] = path :: Nil
 
   override def refresh(): Unit = { }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c844765..e73d018 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -269,6 +269,13 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val HIVE_FILESOURCE_PARTITION_PRUNING =
+    SQLConfigBuilder("spark.sql.hive.filesourcePartitionPruning")
+      .doc("When true, enable metastore partition pruning for file source tables as well. " +
+           "This is currently implemented for converted Hive tables only.")
+      .booleanConf
+      .createWithDefault(true)
+
   val OPTIMIZER_METADATA_ONLY = SQLConfigBuilder("spark.sql.optimizer.metadataOnly")
     .doc("When true, enable the metadata-only query optimization that use the table's metadata " +
       "to produce the partition columns instead of table scans. It applies when all the columns " +
@@ -676,6 +683,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
 
+  def filesourcePartitionPruning: Boolean = getConf(HIVE_FILESOURCE_PARTITION_PRUNING)
+
   def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
 
   def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
index fa3abd0..2695974 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
@@ -77,13 +77,14 @@ class FileCatalogSuite extends SharedSQLContext {
       val catalog1 = new ListingFileCatalog(
         spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None)
       // doesn't throw an exception
-      assert(catalog1.listLeafFiles(catalog1.paths).isEmpty)
+      assert(catalog1.listLeafFiles(catalog1.rootPaths).isEmpty)
     }
   }
 
   test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") {
     class MockCatalog(
-      override val paths: Seq[Path]) extends PartitioningAwareFileCatalog(spark, Map.empty, None) {
+      override val rootPaths: Seq[Path])
+      extends PartitioningAwareFileCatalog(spark, Map.empty, None) {
 
       override def refresh(): Unit = {}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index c5deb31..c32254d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -395,7 +395,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
 
         val fileCatalog = new ListingFileCatalog(
           sparkSession = spark,
-          paths = Seq(new Path(tempDir)),
+          rootPaths = Seq(new Path(tempDir)),
           parameters = Map.empty[String, String],
           partitionSchema = None)
         // This should not fail.

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalogSuite.scala
deleted file mode 100644
index f15730a..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalogSuite.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.spark.SparkFunSuite
-
-class ListingFileCatalogSuite extends SparkFunSuite {
-
-  test("file filtering") {
-    assert(!ListingFileCatalog.shouldFilterOut("abcd"))
-    assert(ListingFileCatalog.shouldFilterOut(".ab"))
-    assert(ListingFileCatalog.shouldFilterOut("_cd"))
-
-    assert(!ListingFileCatalog.shouldFilterOut("_metadata"))
-    assert(!ListingFileCatalog.shouldFilterOut("_common_metadata"))
-    assert(ListingFileCatalog.shouldFilterOut("_ab_metadata"))
-    assert(ListingFileCatalog.shouldFilterOut("_cd_common_metadata"))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala
new file mode 100644
index 0000000..df50958
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.SparkFunSuite
+
+class SessionFileCatalogSuite extends SparkFunSuite {
+
+  test("file filtering") {
+    assert(!SessionFileCatalog.shouldFilterOut("abcd"))
+    assert(SessionFileCatalog.shouldFilterOut(".ab"))
+    assert(SessionFileCatalog.shouldFilterOut("_cd"))
+
+    assert(!SessionFileCatalog.shouldFilterOut("_metadata"))
+    assert(!SessionFileCatalog.shouldFilterOut("_common_metadata"))
+    assert(SessionFileCatalog.shouldFilterOut("_ab_metadata"))
+    assert(SessionFileCatalog.shouldFilterOut("_cd_common_metadata"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 8d18be9..43357c9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -30,7 +30,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitionDirectory => Partition, PartitioningUtils, PartitionSpec}
+import org.apache.spark.sql.execution.datasources.{FileCatalog, HadoopFsRelation, LogicalRelation, PartitionDirectory => Partition, PartitioningUtils, PartitionSpec}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -626,8 +626,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
       (1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath)
       val queryExecution = spark.read.parquet(dir.getCanonicalPath).queryExecution
       queryExecution.analyzed.collectFirst {
-        case LogicalRelation(relation: HadoopFsRelation, _, _) =>
-          assert(relation.partitionSpec === PartitionSpec.emptySpec)
+        case LogicalRelation(HadoopFsRelation(location: FileCatalog, _, _, _, _, _), _, _) =>
+          assert(location.partitionSpec === PartitionSpec.emptySpec)
       }.getOrElse {
         fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution")
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 8a980a7..c3d202c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -1081,6 +1081,34 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
   }
 
   testSchemaClipping(
+    "falls back to case insensitive resolution",
+
+    parquetSchema =
+      """message root {
+        |  required group A {
+        |    optional int32 B;
+        |  }
+        |  optional int32 c;
+        |}
+      """.stripMargin,
+
+    catalystSchema = {
+      val nestedType = new StructType().add("b", IntegerType, nullable = true)
+      new StructType()
+        .add("a", nestedType, nullable = true)
+        .add("c", IntegerType, nullable = true)
+    },
+
+    expectedSchema =
+      """message root {
+        |  required group A {
+        |    optional int32 B;
+        |  }
+        |  optional int32 c;
+        |}
+      """.stripMargin)
+
+  testSchemaClipping(
     "simple nested struct",
 
     parquetSchema =

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index b5d93c3..ff59b54 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -29,17 +29,17 @@ import org.apache.thrift.TException
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics}
 import org.apache.spark.sql.execution.command.{ColumnStatStruct, DDLUtils}
 import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.HiveSerDe
 import org.apache.spark.sql.internal.StaticSQLConf._
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
 
 
 /**
@@ -650,8 +650,35 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   override def listPartitionsByFilter(
       db: String,
       table: String,
-      predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
-    client.getPartitionsByFilter(db, table, predicates)
+      predicates: Seq[Expression]): Seq[CatalogTablePartition] = withClient {
+    val catalogTable = client.getTable(db, table)
+    val partitionColumnNames = catalogTable.partitionColumnNames.toSet
+    val nonPartitionPruningPredicates = predicates.filterNot {
+      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+    }
+
+    if (nonPartitionPruningPredicates.nonEmpty) {
+        sys.error("Expected only partition pruning predicates: " +
+          predicates.reduceLeft(And))
+    }
+
+    val partitionSchema = catalogTable.partitionSchema
+
+    if (predicates.nonEmpty) {
+      val clientPrunedPartitions =
+        client.getPartitionsByFilter(catalogTable, predicates)
+      val boundPredicate =
+        InterpretedPredicate.create(predicates.reduce(And).transform {
+          case att: AttributeReference =>
+            val index = partitionSchema.indexWhere(_.name == att.name)
+            BoundReference(index, partitionSchema(index).dataType, nullable = true)
+        })
+      clientPrunedPartitions.filter { case p: CatalogTablePartition =>
+        boundPredicate(p.toRow(partitionSchema))
+      }
+    } else {
+      client.getPartitions(catalogTable)
+    }
   }
 
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/spark/blob/6ce1b675/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c44f0ad..4a2aaa7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -135,12 +135,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
 
   private def getCached(
       tableIdentifier: QualifiedTableName,
-      pathsInMetastore: Seq[String],
+      pathsInMetastore: Seq[Path],
       metastoreRelation: MetastoreRelation,
       schemaInMetastore: StructType,
       expectedFileFormat: Class[_ <: FileFormat],
       expectedBucketSpec: Option[BucketSpec],
-      partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
+      partitionSchema: Option[StructType]): Option[LogicalRelation] = {
 
     cachedDataSourceTables.getIfPresent(tableIdentifier) match {
       case null => None // Cache miss
@@ -152,12 +152,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
             // If we have the same paths, same schema, and same partition spec,
             // we will use the cached relation.
             val useCached =
-              relation.location.paths.map(_.toString).toSet == pathsInMetastore.toSet &&
+              relation.location.rootPaths.toSet == pathsInMetastore.toSet &&
                 logical.schema.sameType(schemaInMetastore) &&
                 relation.bucketSpec == expectedBucketSpec &&
-                relation.partitionSpec == partitionSpecInMetastore.getOrElse {
-                  PartitionSpec(StructType(Nil), Array.empty[PartitionDirectory])
-                }
+                relation.partitionSchema == partitionSchema.getOrElse(StructType(Nil))
 
             if (useCached) {
               Some(logical)
@@ -196,61 +194,59 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
       QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
     val bucketSpec = None  // We don't support hive bucketed tables, only ones we write out.
 
+    val lazyPruningEnabled = sparkSession.sqlContext.conf.filesourcePartitionPruning
     val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
       val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
-      val partitionColumnDataTypes = partitionSchema.map(_.dataType)
-      // We're converting the entire table into HadoopFsRelation, so predicates to Hive metastore
-      // are empty.
-      val partitions = metastoreRelation.getHiveQlPartitions().map { p =>
-        val location = p.getLocation
-        val values = InternalRow.fromSeq(p.getValues.asScala.zip(partitionColumnDataTypes).map {
-          case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null)
-        })
-        PartitionDirectory(values, location)
-      }
-      val partitionSpec = PartitionSpec(partitionSchema, partitions)
-      val partitionPaths = partitions.map(_.path.toString)
-
-      // By convention (for example, see MetaStorePartitionedTableFileCatalog), the definition of a
-      // partitioned table's paths depends on whether that table has any actual partitions.
-      // Partitioned tables without partitions use the location of the table's base path.
-      // Partitioned tables with partitions use the locations of those partitions' data locations,
-      // _omitting_ the table's base path.
-      val paths = if (partitionPaths.isEmpty) {
-        Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
+
+      val rootPaths: Seq[Path] = if (lazyPruningEnabled) {
+        Seq(metastoreRelation.hiveQlTable.getDataLocation)
       } else {
-        partitionPaths
+        // By convention (for example, see TableFileCatalog), the definition of a
+        // partitioned table's paths depends on whether that table has any actual partitions.
+        // Partitioned tables without partitions use the location of the table's base path.
+        // Partitioned tables with partitions use the locations of those partitions' data
+        // locations,_omitting_ the table's base path.
+        val paths = metastoreRelation.getHiveQlPartitions().map { p =>
+          new Path(p.getLocation)
+        }
+        if (paths.isEmpty) {
+          Seq(metastoreRelation.hiveQlTable.getDataLocation)
+        } else {
+          paths
+        }
       }
 
       val cached = getCached(
         tableIdentifier,
-        paths,
+        rootPaths,
         metastoreRelation,
         metastoreSchema,
         fileFormatClass,
         bucketSpec,
-        Some(partitionSpec))
-
-      val hadoopFsRelation = cached.getOrElse {
-        val fileCatalog = new MetaStorePartitionedTableFileCatalog(
-          sparkSession,
-          new Path(metastoreRelation.catalogTable.storage.locationUri.get),
-          partitionSpec)
-
-        val inferredSchema = if (fileType.equals("parquet")) {
-          val inferredSchema =
-            defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles())
-          inferredSchema.map { inferred =>
-            ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred)
-          }.getOrElse(metastoreSchema)
-        } else {
-          defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get
+        Some(partitionSchema))
+
+      val logicalRelation = cached.getOrElse {
+        val db = metastoreRelation.databaseName
+        val table = metastoreRelation.tableName
+        val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
+        val fileCatalog = {
+          val catalog = new TableFileCatalog(
+            sparkSession, db, table, Some(partitionSchema), sizeInBytes)
+          if (lazyPruningEnabled) {
+            catalog
+          } else {
+            catalog.cachedAllPartitions
+          }
         }
+        val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet
+        val dataSchema =
+          StructType(metastoreSchema
+            .filterNot(field => partitionSchemaColumnNames.contains(field.name.toLowerCase)))
 
         val relation = HadoopFsRelation(
           location = fileCatalog,
           partitionSchema = partitionSchema,
-          dataSchema = inferredSchema,
+          dataSchema = dataSchema,
           bucketSpec = bucketSpec,
           fileFormat = defaultSource,
           options = options)(sparkSession = sparkSession)
@@ -260,12 +256,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
         created
       }
 
-      hadoopFsRelation
+      logicalRelation
     } else {
-      val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
+      val rootPath = metastoreRelation.hiveQlTable.getDataLocation
 
       val cached = getCached(tableIdentifier,
-        paths,
+        Seq(rootPath),
         metastoreRelation,
         metastoreSchema,
         fileFormatClass,
@@ -276,14 +272,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
           LogicalRelation(
             DataSource(
               sparkSession = sparkSession,
-              paths = paths,
+              paths = rootPath.toString :: Nil,
               userSpecifiedSchema = Some(metastoreRelation.schema),
               bucketSpec = bucketSpec,
               options = options,
               className = fileType).resolveRelation(),
               catalogTable = Some(metastoreRelation.catalogTable))
 
-
         cachedDataSourceTables.put(tableIdentifier, created)
         created
       }
@@ -371,34 +366,3 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     }
   }
 }
-
-/**
- * An override of the standard HDFS listing based catalog, that overrides the partition spec with
- * the information from the metastore.
- *
- * @param tableBasePath The default base path of the Hive metastore table
- * @param partitionSpec The partition specifications from Hive metastore
- */
-private[hive] class MetaStorePartitionedTableFileCatalog(
-    sparkSession: SparkSession,
-    tableBasePath: Path,
-    override val partitionSpec: PartitionSpec)
-  extends ListingFileCatalog(
-    sparkSession,
-    MetaStorePartitionedTableFileCatalog.getPaths(tableBasePath, partitionSpec),
-    Map.empty,
-    Some(partitionSpec.partitionColumns)) {
-}
-
-private[hive] object MetaStorePartitionedTableFileCatalog {
-  /** Get the list of paths to list files in the for a metastore table */
-  def getPaths(tableBasePath: Path, partitionSpec: PartitionSpec): Seq[Path] = {
-    // If there are no partitions currently specified then use base path,
-    // otherwise use the paths corresponding to the partitions.
-    if (partitionSpec.partitions.isEmpty) {
-      Seq(tableBasePath)
-    } else {
-      partitionSpec.partitions.map(_.path)
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org