You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/10/27 21:22:36 UTC

[2/2] spark git commit: [SPARK-17970][SQL] store partition spec in metastore for data source table

[SPARK-17970][SQL] store partition spec in metastore for data source table

## What changes were proposed in this pull request?

We should follow hive table and also store partition spec in metastore for data source table.
This brings 2 benefits:

1. It's more flexible to manage the table data files, as users can use `ADD PARTITION`, `DROP PARTITION` and `RENAME PARTITION`
2. We don't need to cache all file status for data source table anymore.

## How was this patch tested?

existing tests.

Author: Eric Liang <ek...@databricks.com>
Author: Michael Allman <mi...@videoamp.com>
Author: Eric Liang <ek...@gmail.com>
Author: Wenchen Fan <we...@databricks.com>

Closes #15515 from cloud-fan/partition.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ccb11543
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ccb11543
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ccb11543

Branch: refs/heads/master
Commit: ccb11543048dccd4cc590a8db1df1d9d5847d112
Parents: 79fd0cc
Author: Eric Liang <ek...@databricks.com>
Authored: Thu Oct 27 14:22:30 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Thu Oct 27 14:22:30 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  |  12 +-
 .../sql/catalyst/trees/TreeNodeSuite.scala      |   1 +
 .../org/apache/spark/sql/DataFrameWriter.scala  |  13 +-
 .../command/AnalyzeColumnCommand.scala          |   3 +-
 .../execution/command/AnalyzeTableCommand.scala |   3 +-
 .../command/createDataSourceTables.scala        |  17 +-
 .../spark/sql/execution/command/ddl.scala       |  90 +++---
 .../spark/sql/execution/command/tables.scala    |  39 +--
 .../sql/execution/datasources/DataSource.scala  |  20 +-
 .../datasources/DataSourceStrategy.scala        |  15 +-
 .../sql/execution/datasources/FileCatalog.scala |   4 +
 .../execution/datasources/FileStatusCache.scala |   2 +-
 .../PartitioningAwareFileCatalog.scala          |  12 +-
 .../datasources/TableFileCatalog.scala          |   4 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |  16 +-
 .../apache/spark/sql/SQLQueryTestSuite.scala    |   2 +-
 .../spark/sql/execution/command/DDLSuite.scala  | 200 +++++-------
 .../spark/sql/hive/HiveExternalCatalog.scala    | 129 +++++---
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |   9 +-
 .../spark/sql/hive/client/HiveClientImpl.scala  |   5 +-
 .../spark/sql/hive/HiveMetadataCacheSuite.scala |   2 +-
 .../sql/hive/HiveTablePerfStatsSuite.scala      | 240 ---------------
 .../PartitionProviderCompatibilitySuite.scala   | 137 +++++++++
 .../hive/PartitionedTablePerfStatsSuite.scala   | 304 +++++++++++++++++++
 .../apache/spark/sql/hive/StatisticsSuite.scala |  65 ++--
 .../sql/hive/execution/HiveCommandSuite.scala   |   5 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |   8 +-
 27 files changed, 812 insertions(+), 545 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index a97ed70..7c3bec8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -89,9 +89,10 @@ case class CatalogTablePartition(
     parameters: Map[String, String] = Map.empty) {
 
   override def toString: String = {
+    val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ")
     val output =
       Seq(
-        s"Partition Values: [${spec.values.mkString(", ")}]",
+        s"Partition Values: [$specString]",
         s"$storage",
         s"Partition Parameters:{${parameters.map(p => p._1 + "=" + p._2).mkString(", ")}}")
 
@@ -137,6 +138,8 @@ case class BucketSpec(
  *                 Can be None if this table is a View, should be "hive" for hive serde tables.
  * @param unsupportedFeatures is a list of string descriptions of features that are used by the
  *        underlying table but not supported by Spark SQL yet.
+ * @param partitionProviderIsHive whether this table's partition metadata is stored in the Hive
+ *                                metastore.
  */
 case class CatalogTable(
     identifier: TableIdentifier,
@@ -154,7 +157,8 @@ case class CatalogTable(
     viewOriginalText: Option[String] = None,
     viewText: Option[String] = None,
     comment: Option[String] = None,
-    unsupportedFeatures: Seq[String] = Seq.empty) {
+    unsupportedFeatures: Seq[String] = Seq.empty,
+    partitionProviderIsHive: Boolean = false) {
 
   /** schema of this table's partition columns */
   def partitionSchema: StructType = StructType(schema.filter {
@@ -212,11 +216,11 @@ case class CatalogTable(
         comment.map("Comment: " + _).getOrElse(""),
         if (properties.nonEmpty) s"Properties: $tableProperties" else "",
         if (stats.isDefined) s"Statistics: ${stats.get.simpleString}" else "",
-        s"$storage")
+        s"$storage",
+        if (partitionProviderIsHive) "Partition Provider: Hive" else "")
 
     output.filter(_.nonEmpty).mkString("CatalogTable(\n\t", "\n\t", ")")
   }
-
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index cb0426c..3eff12f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -489,6 +489,7 @@ class TreeNodeSuite extends SparkFunSuite {
         "owner" -> "",
         "createTime" -> 0,
         "lastAccessTime" -> -1,
+        "partitionProviderIsHive" -> false,
         "properties" -> JNull,
         "unsupportedFeatures" -> List.empty[String]))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 4b5f024..7ff3522 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -25,7 +25,8 @@ import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Union}
+import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
 import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, CreateTable, DataSource, HadoopFsRelation}
 import org.apache.spark.sql.types.StructType
 
@@ -387,7 +388,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
           partitionColumnNames = partitioningColumns.getOrElse(Nil),
           bucketSpec = getBucketSpec
         )
-        val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
+        val createCmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
+        val cmd = if (tableDesc.partitionColumnNames.nonEmpty &&
+            df.sparkSession.sqlContext.conf.manageFilesourcePartitions) {
+          // Need to recover partitions into the metastore so our saved data is visible.
+          val recoverPartitionCmd = AlterTableRecoverPartitionsCommand(tableDesc.identifier)
+          Union(createCmd, recoverPartitionCmd)
+        } else {
+          createCmd
+        }
         df.sparkSession.sessionState.executePlan(cmd).toRdd
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index 4881387..f873f34 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -50,7 +50,8 @@ case class AnalyzeColumnCommand(
           AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable))
 
       case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
-        updateStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes)
+        updateStats(logicalRel.catalogTable.get,
+          AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))
 
       case otherRelation =>
         throw new AnalysisException("ANALYZE TABLE is not supported for " +

http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 7b0e49b..52a8fc8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -51,7 +51,8 @@ case class AnalyzeTableCommand(
 
       // data source tables have been converted into LogicalRelations
       case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
-        updateTableStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes)
+        updateTableStats(logicalRel.catalogTable.get,
+          AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))
 
       case otherRelation =>
         throw new AnalysisException("ANALYZE TABLE is not supported for " +

http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index a8c75a7..2a97431 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -94,10 +94,16 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
     val newTable = table.copy(
       storage = table.storage.copy(properties = optionsWithPath),
       schema = dataSource.schema,
-      partitionColumnNames = partitionColumnNames)
+      partitionColumnNames = partitionColumnNames,
+      // If metastore partition management for file source tables is enabled, we start off with
+      // partition provider hive, but no partitions in the metastore. The user has to call
+      // `msck repair table` to populate the table partitions.
+      partitionProviderIsHive = partitionColumnNames.nonEmpty &&
+        sparkSession.sessionState.conf.manageFilesourcePartitions)
     // We will return Nil or throw exception at the beginning if the table already exists, so when
     // we reach here, the table should not exist and we should set `ignoreIfExists` to false.
     sessionState.catalog.createTable(newTable, ignoreIfExists = false)
+
     Seq.empty[Row]
   }
 }
@@ -232,6 +238,15 @@ case class CreateDataSourceTableAsSelectCommand(
       sessionState.catalog.createTable(newTable, ignoreIfExists = false)
     }
 
+    result match {
+      case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
+          sparkSession.sqlContext.conf.manageFilesourcePartitions =>
+        // Need to recover partitions into the metastore so our saved data is visible.
+        sparkSession.sessionState.executePlan(
+          AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
+      case _ =>
+    }
+
     // Refresh the cache of the table in the catalog.
     sessionState.catalog.refreshTable(tableIdentWithDB)
     Seq.empty[Row]

http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 15656fa..61e0550 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -28,10 +28,11 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, PartitioningUtils}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.SerializableConfiguration
 
@@ -346,10 +347,7 @@ case class AlterTableAddPartitionCommand(
     val catalog = sparkSession.sessionState.catalog
     val table = catalog.getTableMetadata(tableName)
     DDLUtils.verifyAlterTableType(catalog, table, isView = false)
-    if (DDLUtils.isDatasourceTable(table)) {
-      throw new AnalysisException(
-        "ALTER TABLE ADD PARTITION is not allowed for tables defined using the datasource API")
-    }
+    DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE ADD PARTITION")
     val parts = partitionSpecsAndLocs.map { case (spec, location) =>
       val normalizedSpec = PartitioningUtils.normalizePartitionSpec(
         spec,
@@ -382,11 +380,8 @@ case class AlterTableRenamePartitionCommand(
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
     val table = catalog.getTableMetadata(tableName)
-    if (DDLUtils.isDatasourceTable(table)) {
-      throw new AnalysisException(
-        "ALTER TABLE RENAME PARTITION is not allowed for tables defined using the datasource API")
-    }
     DDLUtils.verifyAlterTableType(catalog, table, isView = false)
+    DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE RENAME PARTITION")
 
     val normalizedOldPartition = PartitioningUtils.normalizePartitionSpec(
       oldPartition,
@@ -432,10 +427,7 @@ case class AlterTableDropPartitionCommand(
     val catalog = sparkSession.sessionState.catalog
     val table = catalog.getTableMetadata(tableName)
     DDLUtils.verifyAlterTableType(catalog, table, isView = false)
-    if (DDLUtils.isDatasourceTable(table)) {
-      throw new AnalysisException(
-        "ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API")
-    }
+    DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")
 
     val normalizedSpecs = specs.map { spec =>
       PartitioningUtils.normalizePartitionSpec(
@@ -493,33 +485,39 @@ case class AlterTableRecoverPartitionsCommand(
     }
   }
 
+  private def getBasePath(table: CatalogTable): Option[String] = {
+    if (table.provider == Some("hive")) {
+      table.storage.locationUri
+    } else {
+      new CaseInsensitiveMap(table.storage.properties).get("path")
+    }
+  }
+
   override def run(spark: SparkSession): Seq[Row] = {
     val catalog = spark.sessionState.catalog
     val table = catalog.getTableMetadata(tableName)
     val tableIdentWithDB = table.identifier.quotedString
     DDLUtils.verifyAlterTableType(catalog, table, isView = false)
-    if (DDLUtils.isDatasourceTable(table)) {
-      throw new AnalysisException(
-        s"Operation not allowed: $cmd on datasource tables: $tableIdentWithDB")
-    }
     if (table.partitionColumnNames.isEmpty) {
       throw new AnalysisException(
         s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB")
     }
-    if (table.storage.locationUri.isEmpty) {
+
+    val tablePath = getBasePath(table)
+    if (tablePath.isEmpty) {
       throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " +
         s"location provided: $tableIdentWithDB")
     }
 
-    val root = new Path(table.storage.locationUri.get)
+    val root = new Path(tablePath.get)
     logInfo(s"Recover all the partitions in $root")
     val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
 
     val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
     val hadoopConf = spark.sparkContext.hadoopConfiguration
     val pathFilter = getPathFilter(hadoopConf)
-    val partitionSpecsAndLocs = scanPartitions(
-      spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase), threshold)
+    val partitionSpecsAndLocs = scanPartitions(spark, fs, pathFilter, root, Map(),
+      table.partitionColumnNames, threshold, spark.sessionState.conf.resolver)
     val total = partitionSpecsAndLocs.length
     logInfo(s"Found $total partitions in $root")
 
@@ -531,6 +529,11 @@ case class AlterTableRecoverPartitionsCommand(
     logInfo(s"Finished to gather the fast stats for all $total partitions.")
 
     addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
+    // Updates the table to indicate that its partition metadata is stored in the Hive metastore.
+    // This is always the case for Hive format tables, but is not true for Datasource tables created
+    // before Spark 2.1 unless they are converted via `msck repair table`.
+    spark.sessionState.catalog.alterTable(table.copy(partitionProviderIsHive = true))
+    catalog.refreshTable(tableName)
     logInfo(s"Recovered all partitions ($total).")
     Seq.empty[Row]
   }
@@ -544,7 +547,8 @@ case class AlterTableRecoverPartitionsCommand(
       path: Path,
       spec: TablePartitionSpec,
       partitionNames: Seq[String],
-      threshold: Int): GenSeq[(TablePartitionSpec, Path)] = {
+      threshold: Int,
+      resolver: Resolver): GenSeq[(TablePartitionSpec, Path)] = {
     if (partitionNames.isEmpty) {
       return Seq(spec -> path)
     }
@@ -563,15 +567,15 @@ case class AlterTableRecoverPartitionsCommand(
       val name = st.getPath.getName
       if (st.isDirectory && name.contains("=")) {
         val ps = name.split("=", 2)
-        val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
+        val columnName = PartitioningUtils.unescapePathName(ps(0))
         // TODO: Validate the value
         val value = PartitioningUtils.unescapePathName(ps(1))
-        // comparing with case-insensitive, but preserve the case
-        if (columnName == partitionNames.head) {
-          scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(columnName -> value),
-            partitionNames.drop(1), threshold)
+        if (resolver(columnName, partitionNames.head)) {
+          scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value),
+            partitionNames.drop(1), threshold, resolver)
         } else {
-          logWarning(s"expect partition column ${partitionNames.head}, but got ${ps(0)}, ignore it")
+          logWarning(
+            s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it")
           Seq()
         }
       } else {
@@ -676,16 +680,11 @@ case class AlterTableSetLocationCommand(
     DDLUtils.verifyAlterTableType(catalog, table, isView = false)
     partitionSpec match {
       case Some(spec) =>
+        DDLUtils.verifyPartitionProviderIsHive(
+          sparkSession, table, "ALTER TABLE ... SET LOCATION")
         // Partition spec is specified, so we set the location only for this partition
         val part = catalog.getPartition(table.identifier, spec)
-        val newPart =
-          if (DDLUtils.isDatasourceTable(table)) {
-            throw new AnalysisException(
-              "ALTER TABLE SET LOCATION for partition is not allowed for tables defined " +
-              "using the datasource API")
-          } else {
-            part.copy(storage = part.storage.copy(locationUri = Some(location)))
-          }
+        val newPart = part.copy(storage = part.storage.copy(locationUri = Some(location)))
         catalog.alterPartitions(table.identifier, Seq(newPart))
       case None =>
         // No partition spec is specified, so we set the location for the table itself
@@ -710,6 +709,25 @@ object DDLUtils {
   }
 
   /**
+   * Throws a standard error for actions that require partitionProvider = hive.
+   */
+  def verifyPartitionProviderIsHive(
+      spark: SparkSession, table: CatalogTable, action: String): Unit = {
+    val tableName = table.identifier.table
+    if (!spark.sqlContext.conf.manageFilesourcePartitions && isDatasourceTable(table)) {
+      throw new AnalysisException(
+        s"$action is not allowed on $tableName since filesource partition management is " +
+          "disabled (spark.sql.hive.manageFilesourcePartitions = false).")
+    }
+    if (!table.partitionProviderIsHive && isDatasourceTable(table)) {
+      throw new AnalysisException(
+        s"$action is not allowed on $tableName since its partition metadata is not stored in " +
+          "the Hive metastore. To import this information into the metastore, run " +
+          s"`msck repair table $tableName`")
+    }
+  }
+
+  /**
    * If the command ALTER VIEW is to alter a table or ALTER TABLE is to alter a view,
    * issue an exception [[AnalysisException]].
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index aec2543..4acfffb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -358,19 +358,16 @@ case class TruncateTableCommand(
       throw new AnalysisException(
         s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentwithDB")
     }
-    val isDatasourceTable = DDLUtils.isDatasourceTable(table)
-    if (isDatasourceTable && partitionSpec.isDefined) {
-      throw new AnalysisException(
-        s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
-        s"for tables created using the data sources API: $tableIdentwithDB")
-    }
     if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
       throw new AnalysisException(
         s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
         s"for tables that are not partitioned: $tableIdentwithDB")
     }
+    if (partitionSpec.isDefined) {
+      DDLUtils.verifyPartitionProviderIsHive(spark, table, "TRUNCATE TABLE ... PARTITION")
+    }
     val locations =
-      if (isDatasourceTable) {
+      if (DDLUtils.isDatasourceTable(table)) {
         Seq(table.storage.properties.get("path"))
       } else if (table.partitionColumnNames.isEmpty) {
         Seq(table.storage.locationUri)
@@ -453,7 +450,7 @@ case class DescribeTableCommand(
           describeFormattedTableInfo(metadata, result)
         }
       } else {
-        describeDetailedPartitionInfo(catalog, metadata, result)
+        describeDetailedPartitionInfo(sparkSession, catalog, metadata, result)
       }
     }
 
@@ -492,6 +489,10 @@ case class DescribeTableCommand(
     describeStorageInfo(table, buffer)
 
     if (table.tableType == CatalogTableType.VIEW) describeViewInfo(table, buffer)
+
+    if (DDLUtils.isDatasourceTable(table) && table.partitionProviderIsHive) {
+      append(buffer, "Partition Provider:", "Hive", "")
+    }
   }
 
   private def describeStorageInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
@@ -528,6 +529,7 @@ case class DescribeTableCommand(
   }
 
   private def describeDetailedPartitionInfo(
+      spark: SparkSession,
       catalog: SessionCatalog,
       metadata: CatalogTable,
       result: ArrayBuffer[Row]): Unit = {
@@ -535,10 +537,7 @@ case class DescribeTableCommand(
       throw new AnalysisException(
         s"DESC PARTITION is not allowed on a view: ${table.identifier}")
     }
-    if (DDLUtils.isDatasourceTable(metadata)) {
-      throw new AnalysisException(
-        s"DESC PARTITION is not allowed on a datasource table: ${table.identifier}")
-    }
+    DDLUtils.verifyPartitionProviderIsHive(spark, metadata, "DESC PARTITION")
     val partition = catalog.getPartition(table, partitionSpec)
     if (isExtended) {
       describeExtendedDetailedPartitionInfo(table, metadata, partition, result)
@@ -743,10 +742,7 @@ case class ShowPartitionsCommand(
         s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableIdentWithDB")
     }
 
-    if (DDLUtils.isDatasourceTable(table)) {
-      throw new AnalysisException(
-        s"SHOW PARTITIONS is not allowed on a datasource table: $tableIdentWithDB")
-    }
+    DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "SHOW PARTITIONS")
 
     /**
      * Validate the partitioning spec by making sure all the referenced columns are
@@ -894,18 +890,11 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
 
   private def showHiveTableProperties(metadata: CatalogTable, builder: StringBuilder): Unit = {
     if (metadata.properties.nonEmpty) {
-      val filteredProps = metadata.properties.filterNot {
-        // Skips "EXTERNAL" property for external tables
-        case (key, _) => key == "EXTERNAL" && metadata.tableType == EXTERNAL
-      }
-
-      val props = filteredProps.map { case (key, value) =>
+      val props = metadata.properties.map { case (key, value) =>
         s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'"
       }
 
-      if (props.nonEmpty) {
-        builder ++= props.mkString("TBLPROPERTIES (\n  ", ",\n  ", "\n)\n")
-      }
+      builder ++= props.mkString("TBLPROPERTIES (\n  ", ",\n  ", "\n)\n")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 17da606..5b8f05a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -30,7 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
@@ -65,6 +65,8 @@ import org.apache.spark.util.Utils
  * @param partitionColumns A list of column names that the relation is partitioned by. When this
  *                         list is empty, the relation is unpartitioned.
  * @param bucketSpec An optional specification for bucketing (hash-partitioning) of the data.
+ * @param catalogTable Optional catalog table reference that can be used to push down operations
+ *                     over the datasource to the catalog service.
  */
 case class DataSource(
     sparkSession: SparkSession,
@@ -73,7 +75,8 @@ case class DataSource(
     userSpecifiedSchema: Option[StructType] = None,
     partitionColumns: Seq[String] = Seq.empty,
     bucketSpec: Option[BucketSpec] = None,
-    options: Map[String, String] = Map.empty) extends Logging {
+    options: Map[String, String] = Map.empty,
+    catalogTable: Option[CatalogTable] = None) extends Logging {
 
   case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])
 
@@ -412,9 +415,16 @@ case class DataSource(
             })
         }
 
-        val fileCatalog =
+        val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
+            catalogTable.isDefined && catalogTable.get.partitionProviderIsHive) {
+          new TableFileCatalog(
+            sparkSession,
+            catalogTable.get,
+            catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L))
+        } else {
           new ListingFileCatalog(
             sparkSession, globbedPaths, options, partitionSchema)
+        }
 
         val dataSchema = userSpecifiedSchema.map { schema =>
           val equality = sparkSession.sessionState.conf.resolver
@@ -423,7 +433,7 @@ case class DataSource(
           format.inferSchema(
             sparkSession,
             caseInsensitiveOptions,
-            fileCatalog.allFiles())
+            fileCatalog.asInstanceOf[ListingFileCatalog].allFiles())
         }.getOrElse {
           throw new AnalysisException(
             s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. " +
@@ -432,7 +442,7 @@ case class DataSource(
 
         HadoopFsRelation(
           fileCatalog,
-          partitionSchema = fileCatalog.partitionSpec().partitionColumns,
+          partitionSchema = fileCatalog.partitionSchema,
           dataSchema = dataSchema.asNullable,
           bucketSpec = bucketSpec,
           format,

http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 7d0abe8..f0bcf94 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -30,11 +30,11 @@ import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union}
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
-import org.apache.spark.sql.execution.command.{DDLUtils, ExecutedCommandExec}
+import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, DDLUtils, ExecutedCommandExec}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -179,7 +179,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
           "Cannot overwrite a path that is also being read from.")
       }
 
-      InsertIntoHadoopFsRelationCommand(
+      val insertCmd = InsertIntoHadoopFsRelationCommand(
         outputPath,
         query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver),
         t.bucketSpec,
@@ -188,6 +188,15 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
         t.options,
         query,
         mode)
+
+      if (l.catalogTable.isDefined && l.catalogTable.get.partitionColumnNames.nonEmpty &&
+          l.catalogTable.get.partitionProviderIsHive) {
+        // TODO(ekl) we should be more efficient here and only recover the newly added partitions
+        val recoverPartitionCmd = AlterTableRecoverPartitionsCommand(l.catalogTable.get.identifier)
+        Union(insertCmd, recoverPartitionCmd)
+      } else {
+        insertCmd
+      }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
index 2bc66ce..dba6462 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs._
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.StructType
 
 /**
  * A collection of data files from a partitioned relation, along with the partition values in the
@@ -63,4 +64,7 @@ trait FileCatalog {
 
   /** Sum of table file sizes, in bytes */
   def sizeInBytes: Long
+
+  /** Schema of the partitioning columns, or the empty schema if the table is not partitioned. */
+  def partitionSchema: StructType
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
index e0ec748..7c2e6fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
@@ -64,7 +64,7 @@ object FileStatusCache {
    */
   def newCache(session: SparkSession): FileStatusCache = {
     synchronized {
-      if (session.sqlContext.conf.filesourcePartitionPruning &&
+      if (session.sqlContext.conf.manageFilesourcePartitions &&
           session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
         if (sharedCache == null) {
           sharedCache = new SharedInMemoryCache(

http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 9b1903c..cc4049e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -38,19 +38,21 @@ import org.apache.spark.util.SerializableConfiguration
  * It provides the necessary methods to parse partition data based on a set of files.
  *
  * @param parameters as set of options to control partition discovery
- * @param partitionSchema an optional partition schema that will be use to provide types for the
- *                        discovered partitions
-*/
+ * @param userPartitionSchema an optional partition schema that will be use to provide types for
+ *                            the discovered partitions
+ */
 abstract class PartitioningAwareFileCatalog(
     sparkSession: SparkSession,
     parameters: Map[String, String],
-    partitionSchema: Option[StructType],
+    userPartitionSchema: Option[StructType],
     fileStatusCache: FileStatusCache = NoopCache) extends FileCatalog with Logging {
   import PartitioningAwareFileCatalog.BASE_PATH_PARAM
 
   /** Returns the specification of the partitions inferred from the data. */
   def partitionSpec(): PartitionSpec
 
+  override def partitionSchema: StructType = partitionSpec().partitionColumns
+
   protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
 
   protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]
@@ -122,7 +124,7 @@ abstract class PartitioningAwareFileCatalog(
     val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
       files.exists(f => isDataPath(f.getPath))
     }.keys.toSeq
-    partitionSchema match {
+    userPartitionSchema match {
       case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
         val spec = PartitioningUtils.parsePartitions(
           leafDirs,

http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
index 667379b..b459df5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.StructType
 
 
 /**
@@ -45,6 +46,8 @@ class TableFileCatalog(
 
   private val baseLocation = table.storage.locationUri
 
+  override def partitionSchema: StructType = table.partitionSchema
+
   override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
 
   override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
@@ -63,7 +66,6 @@ class TableFileCatalog(
     if (table.partitionColumnNames.nonEmpty) {
       val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
         table.identifier, filters)
-      val partitionSchema = table.partitionSchema
       val partitions = selectedPartitions.map { p =>
         PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f47ec7f..dc31f3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -272,18 +272,20 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val HIVE_FILESOURCE_PARTITION_PRUNING =
-    SQLConfigBuilder("spark.sql.hive.filesourcePartitionPruning")
-      .doc("When true, enable metastore partition pruning for filesource relations as well. " +
-           "This is currently implemented for converted Hive tables only.")
+  val HIVE_MANAGE_FILESOURCE_PARTITIONS =
+    SQLConfigBuilder("spark.sql.hive.manageFilesourcePartitions")
+      .doc("When true, enable metastore partition management for file source tables as well. " +
+           "This includes both datasource and converted Hive tables. When partition managment " +
+           "is enabled, datasource tables store partition in the Hive metastore, and use the " +
+           "metastore to prune partitions during query planning.")
       .booleanConf
       .createWithDefault(true)
 
   val HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE =
     SQLConfigBuilder("spark.sql.hive.filesourcePartitionFileCacheSize")
-      .doc("When nonzero, enable caching of partition file metadata in memory. All table share " +
+      .doc("When nonzero, enable caching of partition file metadata in memory. All tables share " +
            "a cache that can use up to specified num bytes for file metadata. This conf only " +
-           "applies if filesource partition pruning is also enabled.")
+           "has an effect when hive filesource partition management is enabled.")
       .longConf
       .createWithDefault(250 * 1024 * 1024)
 
@@ -679,7 +681,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
 
-  def filesourcePartitionPruning: Boolean = getConf(HIVE_FILESOURCE_PARTITION_PRUNING)
+  def manageFilesourcePartitions: Boolean = getConf(HIVE_MANAGE_FILESOURCE_PARTITIONS)
 
   def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index 6857dd3..2d73d9f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -197,7 +197,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext {
       assertResult(expected.schema, s"Schema did not match for query #$i\n${expected.sql}") {
         output.schema
       }
-      assertResult(expected.output, s"Result dit not match for query #$i\n${expected.sql}") {
+      assertResult(expected.output, s"Result did not match for query #$i\n${expected.sql}") {
         output.output
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index b989d01..9fb0f53 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -95,7 +95,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
         .add("b", "int"),
       provider = Some("hive"),
       partitionColumnNames = Seq("a", "b"),
-      createTime = 0L)
+      createTime = 0L,
+      partitionProviderIsHive = true)
   }
 
   private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = {
@@ -923,68 +924,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
   }
 
   test("alter table: rename partition") {
-    val catalog = spark.sessionState.catalog
-    val tableIdent = TableIdentifier("tab1", Some("dbx"))
-    createPartitionedTable(tableIdent, isDatasourceTable = false)
-
-    // basic rename partition
-    sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')")
-    sql("ALTER TABLE dbx.tab1 PARTITION (a='2', b='c') RENAME TO PARTITION (a='20', b='c')")
-    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
-      Set(Map("a" -> "100", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
-
-    // rename without explicitly specifying database
-    catalog.setCurrentDatabase("dbx")
-    sql("ALTER TABLE tab1 PARTITION (a='100', b='p') RENAME TO PARTITION (a='10', b='p')")
-    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
-      Set(Map("a" -> "10", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
-
-    // table to alter does not exist
-    intercept[NoSuchTableException] {
-      sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
-    }
-
-    // partition to rename does not exist
-    intercept[NoSuchPartitionException] {
-      sql("ALTER TABLE tab1 PARTITION (a='not_found', b='1') RENAME TO PARTITION (a='1', b='2')")
-    }
-
-    // partition spec in RENAME PARTITION should be case insensitive by default
-    sql("ALTER TABLE tab1 PARTITION (A='10', B='p') RENAME TO PARTITION (A='1', B='p')")
-    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
-      Set(Map("a" -> "1", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
+    testRenamePartitions(isDatasourceTable = false)
   }
 
   test("alter table: rename partition (datasource table)") {
-    createPartitionedTable(TableIdentifier("tab1", Some("dbx")), isDatasourceTable = true)
-    val e = intercept[AnalysisException] {
-      sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')")
-    }.getMessage
-    assert(e.contains(
-      "ALTER TABLE RENAME PARTITION is not allowed for tables defined using the datasource API"))
-    // table to alter does not exist
-    intercept[NoSuchTableException] {
-      sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
-    }
-  }
-
-  private def createPartitionedTable(
-      tableIdent: TableIdentifier,
-      isDatasourceTable: Boolean): Unit = {
-    val catalog = spark.sessionState.catalog
-    val part1 = Map("a" -> "1", "b" -> "q")
-    val part2 = Map("a" -> "2", "b" -> "c")
-    val part3 = Map("a" -> "3", "b" -> "p")
-    createDatabase(catalog, "dbx")
-    createTable(catalog, tableIdent)
-    createTablePartition(catalog, part1, tableIdent)
-    createTablePartition(catalog, part2, tableIdent)
-    createTablePartition(catalog, part3, tableIdent)
-    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
-      Set(part1, part2, part3))
-    if (isDatasourceTable) {
-      convertToDatasourceTable(catalog, tableIdent)
-    }
+    testRenamePartitions(isDatasourceTable = true)
   }
 
   test("show tables") {
@@ -1199,7 +1143,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       if (isDatasourceTable) {
         if (spec.isDefined) {
           assert(storageFormat.properties.isEmpty)
-          assert(storageFormat.locationUri.isEmpty)
+          assert(storageFormat.locationUri === Some(expected))
         } else {
           assert(storageFormat.properties.get("path") === Some(expected))
           assert(storageFormat.locationUri === Some(expected))
@@ -1212,18 +1156,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'")
     verifyLocation("/path/to/your/lovely/heart")
     // set table partition location
-    maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='2') SET LOCATION '/path/to/part/ways'")
-    }
+    sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='2') SET LOCATION '/path/to/part/ways'")
     verifyLocation("/path/to/part/ways", Some(partSpec))
     // set table location without explicitly specifying database
     catalog.setCurrentDatabase("dbx")
     sql("ALTER TABLE tab1 SET LOCATION '/swanky/steak/place'")
     verifyLocation("/swanky/steak/place")
     // set table partition location without explicitly specifying database
-    maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE tab1 PARTITION (a='1', b='2') SET LOCATION 'vienna'")
-    }
+    sql("ALTER TABLE tab1 PARTITION (a='1', b='2') SET LOCATION 'vienna'")
     verifyLocation("vienna", Some(partSpec))
     // table to alter does not exist
     intercept[AnalysisException] {
@@ -1354,26 +1294,18 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
 
     // basic add partition
-    maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " +
-        "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
-    }
-    if (!isDatasourceTable) {
-      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
-      assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isEmpty)
-      assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option("paris"))
-      assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isEmpty)
-    }
+    sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " +
+      "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
+    assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isEmpty)
+    assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option("paris"))
+    assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isEmpty)
 
     // add partitions without explicitly specifying database
     catalog.setCurrentDatabase("dbx")
-    maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
-    }
-    if (!isDatasourceTable) {
-      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
-        Set(part1, part2, part3, part4))
-    }
+    sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+      Set(part1, part2, part3, part4))
 
     // table to alter does not exist
     intercept[AnalysisException] {
@@ -1386,22 +1318,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
 
     // partition to add already exists when using IF NOT EXISTS
-    maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
-    }
-    if (!isDatasourceTable) {
-      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
-        Set(part1, part2, part3, part4))
-    }
+    sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+      Set(part1, part2, part3, part4))
 
     // partition spec in ADD PARTITION should be case insensitive by default
-    maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE tab1 ADD PARTITION (A='9', B='9')")
-    }
-    if (!isDatasourceTable) {
-      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
-        Set(part1, part2, part3, part4, part5))
-    }
+    sql("ALTER TABLE tab1 ADD PARTITION (A='9', B='9')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+      Set(part1, part2, part3, part4, part5))
   }
 
   private def testDropPartitions(isDatasourceTable: Boolean): Unit = {
@@ -1424,21 +1348,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
 
     // basic drop partition
-    maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (a='4', b='8'), PARTITION (a='3', b='7')")
-    }
-    if (!isDatasourceTable) {
-      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2))
-    }
+    sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (a='4', b='8'), PARTITION (a='3', b='7')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2))
 
     // drop partitions without explicitly specifying database
     catalog.setCurrentDatabase("dbx")
-    maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='2', b ='6')")
-    }
-    if (!isDatasourceTable) {
-      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
-    }
+    sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='2', b ='6')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
 
     // table to alter does not exist
     intercept[AnalysisException] {
@@ -1451,20 +1367,56 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
 
     // partition to drop does not exist when using IF EXISTS
-    maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='300')")
-    }
-    if (!isDatasourceTable) {
-      assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
-    }
+    sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='300')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
 
     // partition spec in DROP PARTITION should be case insensitive by default
-    maybeWrapException(isDatasourceTable) {
-      sql("ALTER TABLE tab1 DROP PARTITION (A='1', B='5')")
+    sql("ALTER TABLE tab1 DROP PARTITION (A='1', B='5')")
+    assert(catalog.listPartitions(tableIdent).isEmpty)
+  }
+
+  private def testRenamePartitions(isDatasourceTable: Boolean): Unit = {
+    val catalog = spark.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1", Some("dbx"))
+    val part1 = Map("a" -> "1", "b" -> "q")
+    val part2 = Map("a" -> "2", "b" -> "c")
+    val part3 = Map("a" -> "3", "b" -> "p")
+    createDatabase(catalog, "dbx")
+    createTable(catalog, tableIdent)
+    createTablePartition(catalog, part1, tableIdent)
+    createTablePartition(catalog, part2, tableIdent)
+    createTablePartition(catalog, part3, tableIdent)
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
+    if (isDatasourceTable) {
+      convertToDatasourceTable(catalog, tableIdent)
+    }
+
+    // basic rename partition
+    sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')")
+    sql("ALTER TABLE dbx.tab1 PARTITION (a='2', b='c') RENAME TO PARTITION (a='20', b='c')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+      Set(Map("a" -> "100", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
+
+    // rename without explicitly specifying database
+    catalog.setCurrentDatabase("dbx")
+    sql("ALTER TABLE tab1 PARTITION (a='100', b='p') RENAME TO PARTITION (a='10', b='p')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+      Set(Map("a" -> "10", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
+
+    // table to alter does not exist
+    intercept[NoSuchTableException] {
+      sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
     }
-    if (!isDatasourceTable) {
-      assert(catalog.listPartitions(tableIdent).isEmpty)
+
+    // partition to rename does not exist
+    intercept[NoSuchPartitionException] {
+      sql("ALTER TABLE tab1 PARTITION (a='not_found', b='1') RENAME TO PARTITION (a='1', b='2')")
     }
+
+    // partition spec in RENAME PARTITION should be case insensitive by default
+    sql("ALTER TABLE tab1 PARTITION (A='10', B='p') RENAME TO PARTITION (A='1', B='p')")
+    assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
+      Set(Map("a" -> "1", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
   }
 
   test("drop build-in function") {
@@ -1683,12 +1635,16 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       }
     }
 
-    // truncating partitioned data source tables is not supported
     withTable("rectangles", "rectangles2") {
       data.write.saveAsTable("rectangles")
       data.write.partitionBy("length").saveAsTable("rectangles2")
+
+      // not supported since the table is not partitioned
       assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)")
-      assertUnsupported("TRUNCATE TABLE rectangles2 PARTITION (width=1)")
+
+      // supported since partitions are stored in the metastore
+      sql("TRUNCATE TABLE rectangles2 PARTITION (width=1)")
+      assert(spark.table("rectangles2").collect().isEmpty)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 2003ff4..409c316 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.command.{ColumnStatStruct, DDLUtils}
 import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.HiveSerDe
+import org.apache.spark.sql.internal.SQLConf._
 import org.apache.spark.sql.internal.StaticSQLConf._
 import org.apache.spark.sql.types.{DataType, StructField, StructType}
 
@@ -105,13 +106,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
    * metastore.
    */
   private def verifyTableProperties(table: CatalogTable): Unit = {
-    val invalidKeys = table.properties.keys.filter { key =>
-      key.startsWith(DATASOURCE_PREFIX) || key.startsWith(STATISTICS_PREFIX)
-    }
+    val invalidKeys = table.properties.keys.filter(_.startsWith(SPARK_SQL_PREFIX))
     if (invalidKeys.nonEmpty) {
       throw new AnalysisException(s"Cannot persistent ${table.qualifiedName} into hive metastore " +
-        s"as table property keys may not start with '$DATASOURCE_PREFIX' or '$STATISTICS_PREFIX':" +
-        s" ${invalidKeys.mkString("[", ", ", "]")}")
+        s"as table property keys may not start with '$SPARK_SQL_PREFIX': " +
+        invalidKeys.mkString("[", ", ", "]"))
     }
     // External users are not allowed to set/switch the table type. In Hive metastore, the table
     // type can be switched by changing the value of a case-sensitive table property `EXTERNAL`.
@@ -190,11 +189,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       throw new TableAlreadyExistsException(db = db, table = table)
     }
     // Before saving data source table metadata into Hive metastore, we should:
-    //  1. Put table schema, partition column names and bucket specification in table properties.
+    //  1. Put table provider, schema, partition column names, bucket specification and partition
+    //     provider in table properties.
     //  2. Check if this table is hive compatible
     //    2.1  If it's not hive compatible, set schema, partition columns and bucket spec to empty
     //         and save table metadata to Hive.
-    //    2.1  If it's hive compatible, set serde information in table metadata and try to save
+    //    2.2  If it's hive compatible, set serde information in table metadata and try to save
     //         it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
     if (DDLUtils.isDatasourceTable(tableDefinition)) {
       // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
@@ -204,6 +204,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
 
       val tableProperties = new scala.collection.mutable.HashMap[String, String]
       tableProperties.put(DATASOURCE_PROVIDER, provider)
+      if (tableDefinition.partitionProviderIsHive) {
+        tableProperties.put(TABLE_PARTITION_PROVIDER, "hive")
+      }
 
       // Serialized JSON schema string may be too long to be stored into a single metastore table
       // property. In this case, we split the JSON string and store each part as a separate table
@@ -241,12 +244,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
         }
       }
 
-      // converts the table metadata to Spark SQL specific format, i.e. set schema, partition column
-      // names and bucket specification to empty.
+      // converts the table metadata to Spark SQL specific format, i.e. set data schema, names and
+      // bucket specification to empty. Note that partition columns are retained, so that we can
+      // call partition-related Hive API later.
       def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
         tableDefinition.copy(
-          schema = new StructType,
-          partitionColumnNames = Nil,
+          schema = tableDefinition.partitionSchema,
           bucketSpec = None,
           properties = tableDefinition.properties ++ tableProperties)
       }
@@ -419,12 +422,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
       // to retain the spark specific format if it is. Also add old data source properties to table
       // properties, to retain the data source table format.
-      val oldDataSourceProps = oldDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX))
+      val oldDataSourceProps = oldDef.properties.filter(_._1.startsWith(SPARK_SQL_PREFIX))
+      val partitionProviderProp = if (tableDefinition.partitionProviderIsHive) {
+        TABLE_PARTITION_PROVIDER -> "hive"
+      } else {
+        TABLE_PARTITION_PROVIDER -> "builtin"
+      }
       val newDef = withStatsProps.copy(
         schema = oldDef.schema,
         partitionColumnNames = oldDef.partitionColumnNames,
         bucketSpec = oldDef.bucketSpec,
-        properties = oldDataSourceProps ++ withStatsProps.properties)
+        properties = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp)
 
       client.alterTable(newDef)
     } else {
@@ -448,7 +456,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
    * properties, and filter out these special entries from table properties.
    */
   private def restoreTableMetadata(table: CatalogTable): CatalogTable = {
-    val catalogTable = if (table.tableType == VIEW || conf.get(DEBUG_MODE)) {
+    if (conf.get(DEBUG_MODE)) {
+      return table
+    }
+
+    val tableWithSchema = if (table.tableType == VIEW) {
       table
     } else {
       getProviderFromTableProperties(table).map { provider =>
@@ -473,30 +485,32 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
           provider = Some(provider),
           partitionColumnNames = getPartitionColumnsFromTableProperties(table),
           bucketSpec = getBucketSpecFromTableProperties(table),
-          properties = getOriginalTableProperties(table))
+          partitionProviderIsHive = table.properties.get(TABLE_PARTITION_PROVIDER) == Some("hive"))
       } getOrElse {
-        table.copy(provider = Some("hive"))
+        table.copy(provider = Some("hive"), partitionProviderIsHive = true)
       }
     }
+
     // construct Spark's statistics from information in Hive metastore
-    val statsProps = catalogTable.properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
-    if (statsProps.nonEmpty) {
+    val statsProps = tableWithSchema.properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
+    val tableWithStats = if (statsProps.nonEmpty) {
       val colStatsProps = statsProps.filterKeys(_.startsWith(STATISTICS_COL_STATS_PREFIX))
         .map { case (k, v) => (k.drop(STATISTICS_COL_STATS_PREFIX.length), v) }
-      val colStats: Map[String, ColumnStat] = catalogTable.schema.collect {
+      val colStats: Map[String, ColumnStat] = tableWithSchema.schema.collect {
         case f if colStatsProps.contains(f.name) =>
           val numFields = ColumnStatStruct.numStatFields(f.dataType)
           (f.name, ColumnStat(numFields, colStatsProps(f.name)))
       }.toMap
-      catalogTable.copy(
-        properties = removeStatsProperties(catalogTable),
+      tableWithSchema.copy(
         stats = Some(Statistics(
-          sizeInBytes = BigInt(catalogTable.properties(STATISTICS_TOTAL_SIZE)),
-          rowCount = catalogTable.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
+          sizeInBytes = BigInt(tableWithSchema.properties(STATISTICS_TOTAL_SIZE)),
+          rowCount = tableWithSchema.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
           colStats = colStats)))
     } else {
-      catalogTable
+      tableWithSchema
     }
+
+    tableWithStats.copy(properties = getOriginalTableProperties(table))
   }
 
   override def tableExists(db: String, table: String): Boolean = withClient {
@@ -581,13 +595,30 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   // Partitions
   // --------------------------------------------------------------------------
 
+  // Hive metastore is not case preserving and the partition columns are always lower cased. We need
+  // to lower case the column names in partition specification before calling partition related Hive
+  // APIs, to match this behaviour.
+  private def lowerCasePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = {
+    spec.map { case (k, v) => k.toLowerCase -> v }
+  }
+
+  // Hive metastore is not case preserving and the column names of the partition specification we
+  // get from the metastore are always lower cased. We should restore them w.r.t. the actual table
+  // partition columns.
+  private def restorePartitionSpec(
+      spec: TablePartitionSpec,
+      partCols: Seq[String]): TablePartitionSpec = {
+    spec.map { case (k, v) => partCols.find(_.equalsIgnoreCase(k)).get -> v }
+  }
+
   override def createPartitions(
       db: String,
       table: String,
       parts: Seq[CatalogTablePartition],
       ignoreIfExists: Boolean): Unit = withClient {
     requireTableExists(db, table)
-    client.createPartitions(db, table, parts, ignoreIfExists)
+    val lowerCasedParts = parts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
+    client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)
   }
 
   override def dropPartitions(
@@ -597,7 +628,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       ignoreIfNotExists: Boolean,
       purge: Boolean): Unit = withClient {
     requireTableExists(db, table)
-    client.dropPartitions(db, table, parts, ignoreIfNotExists, purge)
+    client.dropPartitions(db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge)
   }
 
   override def renamePartitions(
@@ -605,21 +636,24 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       table: String,
       specs: Seq[TablePartitionSpec],
       newSpecs: Seq[TablePartitionSpec]): Unit = withClient {
-    client.renamePartitions(db, table, specs, newSpecs)
+    client.renamePartitions(
+      db, table, specs.map(lowerCasePartitionSpec), newSpecs.map(lowerCasePartitionSpec))
   }
 
   override def alterPartitions(
       db: String,
       table: String,
       newParts: Seq[CatalogTablePartition]): Unit = withClient {
-    client.alterPartitions(db, table, newParts)
+    val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
+    client.alterPartitions(db, table, lowerCasedParts)
   }
 
   override def getPartition(
       db: String,
       table: String,
       spec: TablePartitionSpec): CatalogTablePartition = withClient {
-    client.getPartition(db, table, spec)
+    val part = client.getPartition(db, table, lowerCasePartitionSpec(spec))
+    part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames))
   }
 
   /**
@@ -629,7 +663,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       db: String,
       table: String,
       spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient {
-    client.getPartitionOption(db, table, spec)
+    client.getPartitionOption(db, table, lowerCasePartitionSpec(spec)).map { part =>
+      part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames))
+    }
   }
 
   /**
@@ -639,14 +675,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       db: String,
       table: String,
       partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = withClient {
-    client.getPartitions(db, table, partialSpec)
+    client.getPartitions(db, table, partialSpec.map(lowerCasePartitionSpec)).map { part =>
+      part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames))
+    }
   }
 
   override def listPartitionsByFilter(
       db: String,
       table: String,
       predicates: Seq[Expression]): Seq[CatalogTablePartition] = withClient {
-    val catalogTable = client.getTable(db, table)
+    val rawTable = client.getTable(db, table)
+    val catalogTable = restoreTableMetadata(rawTable)
     val partitionColumnNames = catalogTable.partitionColumnNames.toSet
     val nonPartitionPruningPredicates = predicates.filterNot {
       _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
@@ -660,19 +699,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     val partitionSchema = catalogTable.partitionSchema
 
     if (predicates.nonEmpty) {
-      val clientPrunedPartitions =
-        client.getPartitionsByFilter(catalogTable, predicates)
+      val clientPrunedPartitions = client.getPartitionsByFilter(rawTable, predicates).map { part =>
+        part.copy(spec = restorePartitionSpec(part.spec, catalogTable.partitionColumnNames))
+      }
       val boundPredicate =
         InterpretedPredicate.create(predicates.reduce(And).transform {
           case att: AttributeReference =>
             val index = partitionSchema.indexWhere(_.name == att.name)
             BoundReference(index, partitionSchema(index).dataType, nullable = true)
         })
-      clientPrunedPartitions.filter { case p: CatalogTablePartition =>
-        boundPredicate(p.toRow(partitionSchema))
-      }
+      clientPrunedPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) }
     } else {
-      client.getPartitions(catalogTable)
+      client.getPartitions(catalogTable).map { part =>
+        part.copy(spec = restorePartitionSpec(part.spec, catalogTable.partitionColumnNames))
+      }
     }
   }
 
@@ -722,7 +762,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
 }
 
 object HiveExternalCatalog {
-  val DATASOURCE_PREFIX = "spark.sql.sources."
+  val SPARK_SQL_PREFIX = "spark.sql."
+
+  val DATASOURCE_PREFIX = SPARK_SQL_PREFIX + "sources."
   val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider"
   val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
   val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
@@ -736,21 +778,20 @@ object HiveExternalCatalog {
   val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol."
   val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol."
 
-  val STATISTICS_PREFIX = "spark.sql.statistics."
+  val STATISTICS_PREFIX = SPARK_SQL_PREFIX + "statistics."
   val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize"
   val STATISTICS_NUM_ROWS = STATISTICS_PREFIX + "numRows"
   val STATISTICS_COL_STATS_PREFIX = STATISTICS_PREFIX + "colStats."
 
-  def removeStatsProperties(metadata: CatalogTable): Map[String, String] = {
-    metadata.properties.filterNot { case (key, _) => key.startsWith(STATISTICS_PREFIX) }
-  }
+  val TABLE_PARTITION_PROVIDER = SPARK_SQL_PREFIX + "partitionProvider"
+
 
   def getProviderFromTableProperties(metadata: CatalogTable): Option[String] = {
     metadata.properties.get(DATASOURCE_PROVIDER)
   }
 
   def getOriginalTableProperties(metadata: CatalogTable): Map[String, String] = {
-    metadata.properties.filterNot { case (key, _) => key.startsWith(DATASOURCE_PREFIX) }
+    metadata.properties.filterNot { case (key, _) => key.startsWith(SPARK_SQL_PREFIX) }
   }
 
   // A persisted data source table always store its schema in the catalog.

http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 6c1585d..d1de863 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -76,11 +76,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
             partitionColumns = table.partitionColumnNames,
             bucketSpec = table.bucketSpec,
             className = table.provider.get,
-            options = table.storage.properties)
+            options = table.storage.properties,
+            catalogTable = Some(table))
 
-        LogicalRelation(
-          dataSource.resolveRelation(),
-          catalogTable = Some(table))
+        LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table))
       }
     }
 
@@ -194,7 +193,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
       QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
     val bucketSpec = None  // We don't support hive bucketed tables, only ones we write out.
 
-    val lazyPruningEnabled = sparkSession.sqlContext.conf.filesourcePartitionPruning
+    val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
     val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
       val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 8835b26..84873bb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -777,7 +777,7 @@ private[hive] class HiveClientImpl(
     val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
       table.partitionColumnNames.contains(c.getName)
     }
-    if (table.schema.isEmpty) {
+    if (schema.isEmpty) {
       // This is a hack to preserve existing behavior. Before Spark 2.0, we do not
       // set a default serde here (this was done in Hive), and so if the user provides
       // an empty schema Hive would automatically populate the schema with a single
@@ -831,9 +831,6 @@ private[hive] class HiveClientImpl(
     new HivePartition(ht, tpart)
   }
 
-  // TODO (cloud-fan): the column names in partition specification are always lower cased because
-  // Hive metastore is not case preserving. We should normalize them to the actual column names of
-  // the table, once we store partition spec of data source tables.
   private def fromHivePartition(hp: HivePartition): CatalogTablePartition = {
     val apiPartition = hp.getTPartition
     CatalogTablePartition(

http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
index d290fe9..6e887d9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
@@ -63,7 +63,7 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
 
   def testCaching(pruningEnabled: Boolean): Unit = {
     test(s"partitioned table is cached when partition pruning is $pruningEnabled") {
-      withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> pruningEnabled.toString) {
+      withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> pruningEnabled.toString) {
         withTable("test") {
           withTempDir { dir =>
             spark.range(5).selectExpr("id", "id as f1", "id as f2").write

http://git-wip-us.apache.org/repos/asf/spark/blob/ccb11543/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala
deleted file mode 100644
index 82ee813..0000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.io.File
-
-import org.scalatest.BeforeAndAfterEach
-
-import org.apache.spark.metrics.source.HiveCatalogMetrics
-import org.apache.spark.sql.execution.datasources.FileStatusCache
-import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SQLTestUtils
-
-class HiveTablePerfStatsSuite
-  extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach {
-
-  override def beforeEach(): Unit = {
-    super.beforeEach()
-    FileStatusCache.resetForTesting()
-  }
-
-  override def afterEach(): Unit = {
-    super.afterEach()
-    FileStatusCache.resetForTesting()
-  }
-
-  private def setupPartitionedTable(tableName: String, dir: File): Unit = {
-    spark.range(5).selectExpr("id", "id as partCol1", "id as partCol2").write
-      .partitionBy("partCol1", "partCol2")
-      .mode("overwrite")
-      .parquet(dir.getAbsolutePath)
-
-    spark.sql(s"""
-      |create external table $tableName (id long)
-      |partitioned by (partCol1 int, partCol2 int)
-      |stored as parquet
-      |location "${dir.getAbsolutePath}"""".stripMargin)
-    spark.sql(s"msck repair table $tableName")
-  }
-
-  test("partitioned pruned table reports only selected files") {
-    assert(spark.sqlContext.getConf(HiveUtils.CONVERT_METASTORE_PARQUET.key) == "true")
-    withTable("test") {
-      withTempDir { dir =>
-        setupPartitionedTable("test", dir)
-        val df = spark.sql("select * from test")
-        assert(df.count() == 5)
-        assert(df.inputFiles.length == 5)  // unpruned
-
-        val df2 = spark.sql("select * from test where partCol1 = 3 or partCol2 = 4")
-        assert(df2.count() == 2)
-        assert(df2.inputFiles.length == 2)  // pruned, so we have less files
-
-        val df3 = spark.sql("select * from test where PARTCOL1 = 3 or partcol2 = 4")
-        assert(df3.count() == 2)
-        assert(df3.inputFiles.length == 2)
-
-        val df4 = spark.sql("select * from test where partCol1 = 999")
-        assert(df4.count() == 0)
-        assert(df4.inputFiles.length == 0)
-      }
-    }
-  }
-
-  test("lazy partition pruning reads only necessary partition data") {
-    withSQLConf(
-        SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true",
-        SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key -> "0") {
-      withTable("test") {
-        withTempDir { dir =>
-          setupPartitionedTable("test", dir)
-          HiveCatalogMetrics.reset()
-          spark.sql("select * from test where partCol1 = 999").count()
-          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
-
-          HiveCatalogMetrics.reset()
-          spark.sql("select * from test where partCol1 < 2").count()
-          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
-
-          HiveCatalogMetrics.reset()
-          spark.sql("select * from test where partCol1 < 3").count()
-          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 3)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 3)
-
-          // should read all
-          HiveCatalogMetrics.reset()
-          spark.sql("select * from test").count()
-          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
-
-          // read all should not be cached
-          HiveCatalogMetrics.reset()
-          spark.sql("select * from test").count()
-          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
-
-          // cache should be disabled
-          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
-        }
-      }
-    }
-  }
-
-  test("lazy partition pruning with file status caching enabled") {
-    withSQLConf(
-        "spark.sql.hive.filesourcePartitionPruning" -> "true",
-        "spark.sql.hive.filesourcePartitionFileCacheSize" -> "9999999") {
-      withTable("test") {
-        withTempDir { dir =>
-          setupPartitionedTable("test", dir)
-          HiveCatalogMetrics.reset()
-          assert(spark.sql("select * from test where partCol1 = 999").count() == 0)
-          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
-          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
-
-          HiveCatalogMetrics.reset()
-          assert(spark.sql("select * from test where partCol1 < 2").count() == 2)
-          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
-          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
-
-          HiveCatalogMetrics.reset()
-          assert(spark.sql("select * from test where partCol1 < 3").count() == 3)
-          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 3)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1)
-          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 2)
-
-          HiveCatalogMetrics.reset()
-          assert(spark.sql("select * from test").count() == 5)
-          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
-          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 3)
-
-          HiveCatalogMetrics.reset()
-          assert(spark.sql("select * from test").count() == 5)
-          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
-          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 5)
-        }
-      }
-    }
-  }
-
-  test("file status caching respects refresh table and refreshByPath") {
-    withSQLConf(
-        "spark.sql.hive.filesourcePartitionPruning" -> "true",
-        "spark.sql.hive.filesourcePartitionFileCacheSize" -> "9999999") {
-      withTable("test") {
-        withTempDir { dir =>
-          setupPartitionedTable("test", dir)
-          HiveCatalogMetrics.reset()
-          assert(spark.sql("select * from test").count() == 5)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
-          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
-
-          HiveCatalogMetrics.reset()
-          spark.sql("refresh table test")
-          assert(spark.sql("select * from test").count() == 5)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
-          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
-
-          spark.catalog.cacheTable("test")
-          HiveCatalogMetrics.reset()
-          spark.catalog.refreshByPath(dir.getAbsolutePath)
-          assert(spark.sql("select * from test").count() == 5)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
-          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
-        }
-      }
-    }
-  }
-
-  test("file status cache respects size limit") {
-    withSQLConf(
-        "spark.sql.hive.filesourcePartitionPruning" -> "true",
-        "spark.sql.hive.filesourcePartitionFileCacheSize" -> "1" /* 1 byte */) {
-      withTable("test") {
-        withTempDir { dir =>
-          setupPartitionedTable("test", dir)
-          HiveCatalogMetrics.reset()
-          assert(spark.sql("select * from test").count() == 5)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
-          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
-          assert(spark.sql("select * from test").count() == 5)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 10)
-          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
-        }
-      }
-    }
-  }
-
-  test("all partitions read and cached when filesource partition pruning is off") {
-    withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "false") {
-      withTable("test") {
-        withTempDir { dir =>
-          setupPartitionedTable("test", dir)
-
-          // We actually query the partitions from hive each time the table is resolved in this
-          // mode. This is kind of terrible, but is needed to preserve the legacy behavior
-          // of doing plan cache validation based on the entire partition set.
-          HiveCatalogMetrics.reset()
-          assert(spark.sql("select * from test where partCol1 = 999").count() == 0)
-          // 5 from table resolution, another 5 from ListingFileCatalog
-          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 10)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
-
-          HiveCatalogMetrics.reset()
-          assert(spark.sql("select * from test where partCol1 < 2").count() == 2)
-          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
-
-          HiveCatalogMetrics.reset()
-          assert(spark.sql("select * from test").count() == 5)
-          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
-        }
-      }
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org