You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/11/03 18:48:12 UTC

spark git commit: [SPARK-18244][SQL] Rename partitionProviderIsHive -> tracksPartitionsInCatalog

Repository: spark
Updated Branches:
  refs/heads/master 27daf6bcd -> b17057c0a


[SPARK-18244][SQL] Rename partitionProviderIsHive -> tracksPartitionsInCatalog

## What changes were proposed in this pull request?
This patch renames partitionProviderIsHive to tracksPartitionsInCatalog, as the old name was too Hive specific.

## How was this patch tested?
Should be covered by existing tests.

Author: Reynold Xin <rx...@databricks.com>

Closes #15750 from rxin/SPARK-18244.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b17057c0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b17057c0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b17057c0

Branch: refs/heads/master
Commit: b17057c0a69b9c56e503483d97f5dc209eef0884
Parents: 27daf6b
Author: Reynold Xin <rx...@databricks.com>
Authored: Thu Nov 3 11:48:05 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Nov 3 11:48:05 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  |  9 +++++----
 .../sql/catalyst/trees/TreeNodeSuite.scala      |  2 +-
 .../command/createDataSourceTables.scala        |  2 +-
 .../spark/sql/execution/command/ddl.scala       |  4 ++--
 .../spark/sql/execution/command/tables.scala    |  2 +-
 .../sql/execution/datasources/DataSource.scala  |  2 +-
 .../datasources/DataSourceStrategy.scala        |  7 ++++---
 .../InsertIntoHadoopFsRelationCommand.scala     |  6 +-----
 .../spark/sql/execution/command/DDLSuite.scala  |  2 +-
 .../spark/sql/hive/HiveExternalCatalog.scala    | 21 ++++++++++++--------
 10 files changed, 30 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b17057c0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 7c3bec8..34748a0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -138,8 +138,9 @@ case class BucketSpec(
  *                 Can be None if this table is a View, should be "hive" for hive serde tables.
  * @param unsupportedFeatures is a list of string descriptions of features that are used by the
  *        underlying table but not supported by Spark SQL yet.
- * @param partitionProviderIsHive whether this table's partition metadata is stored in the Hive
- *                                metastore.
+ * @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the
+ *                                  catalog. If false, it is inferred automatically based on file
+ *                                  structure.
  */
 case class CatalogTable(
     identifier: TableIdentifier,
@@ -158,7 +159,7 @@ case class CatalogTable(
     viewText: Option[String] = None,
     comment: Option[String] = None,
     unsupportedFeatures: Seq[String] = Seq.empty,
-    partitionProviderIsHive: Boolean = false) {
+    tracksPartitionsInCatalog: Boolean = false) {
 
   /** schema of this table's partition columns */
   def partitionSchema: StructType = StructType(schema.filter {
@@ -217,7 +218,7 @@ case class CatalogTable(
         if (properties.nonEmpty) s"Properties: $tableProperties" else "",
         if (stats.isDefined) s"Statistics: ${stats.get.simpleString}" else "",
         s"$storage",
-        if (partitionProviderIsHive) "Partition Provider: Hive" else "")
+        if (tracksPartitionsInCatalog) "Partition Provider: Catalog" else "")
 
     output.filter(_.nonEmpty).mkString("CatalogTable(\n\t", "\n\t", ")")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b17057c0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index 3eff12f..af1eaa1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -489,7 +489,7 @@ class TreeNodeSuite extends SparkFunSuite {
         "owner" -> "",
         "createTime" -> 0,
         "lastAccessTime" -> -1,
-        "partitionProviderIsHive" -> false,
+        "tracksPartitionsInCatalog" -> false,
         "properties" -> JNull,
         "unsupportedFeatures" -> List.empty[String]))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b17057c0/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index d4b2827..7e16e43 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -92,7 +92,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
       // If metastore partition management for file source tables is enabled, we start off with
       // partition provider hive, but no partitions in the metastore. The user has to call
       // `msck repair table` to populate the table partitions.
-      partitionProviderIsHive = partitionColumnNames.nonEmpty &&
+      tracksPartitionsInCatalog = partitionColumnNames.nonEmpty &&
         sparkSession.sessionState.conf.manageFilesourcePartitions)
     // We will return Nil or throw exception at the beginning if the table already exists, so when
     // we reach here, the table should not exist and we should set `ignoreIfExists` to false.

http://git-wip-us.apache.org/repos/asf/spark/blob/b17057c0/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 52af915..b4d3ca1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -523,7 +523,7 @@ case class AlterTableRecoverPartitionsCommand(
     // Updates the table to indicate that its partition metadata is stored in the Hive metastore.
     // This is always the case for Hive format tables, but is not true for Datasource tables created
     // before Spark 2.1 unless they are converted via `msck repair table`.
-    spark.sessionState.catalog.alterTable(table.copy(partitionProviderIsHive = true))
+    spark.sessionState.catalog.alterTable(table.copy(tracksPartitionsInCatalog = true))
     catalog.refreshTable(tableName)
     logInfo(s"Recovered all partitions ($total).")
     Seq.empty[Row]
@@ -702,7 +702,7 @@ object DDLUtils {
         s"$action is not allowed on $tableName since filesource partition management is " +
           "disabled (spark.sql.hive.manageFilesourcePartitions = false).")
     }
-    if (!table.partitionProviderIsHive && isDatasourceTable(table)) {
+    if (!table.tracksPartitionsInCatalog && isDatasourceTable(table)) {
       throw new AnalysisException(
         s"$action is not allowed on $tableName since its partition metadata is not stored in " +
           "the Hive metastore. To import this information into the metastore, run " +

http://git-wip-us.apache.org/repos/asf/spark/blob/b17057c0/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index f32c956..00c646b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -467,7 +467,7 @@ case class DescribeTableCommand(
 
     if (table.tableType == CatalogTableType.VIEW) describeViewInfo(table, buffer)
 
-    if (DDLUtils.isDatasourceTable(table) && table.partitionProviderIsHive) {
+    if (DDLUtils.isDatasourceTable(table) && table.tracksPartitionsInCatalog) {
       append(buffer, "Partition Provider:", "Hive", "")
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b17057c0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 0b50448..5266611 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -308,7 +308,7 @@ case class DataSource(
         }
 
         val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
-            catalogTable.isDefined && catalogTable.get.partitionProviderIsHive) {
+            catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
           new CatalogFileIndex(
             sparkSession,
             catalogTable.get,

http://git-wip-us.apache.org/repos/asf/spark/blob/b17057c0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index e87998f..a548e88 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -182,9 +182,10 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
           "Cannot overwrite a path that is also being read from.")
       }
 
-      val overwritingSinglePartition = (overwrite.specificPartition.isDefined &&
+      val overwritingSinglePartition =
+        overwrite.specificPartition.isDefined &&
         t.sparkSession.sessionState.conf.manageFilesourcePartitions &&
-        l.catalogTable.get.partitionProviderIsHive)
+        l.catalogTable.get.tracksPartitionsInCatalog
 
       val effectiveOutputPath = if (overwritingSinglePartition) {
         val partition = t.sparkSession.sessionState.catalog.getPartition(
@@ -203,7 +204,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
       def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = {
         if (l.catalogTable.isDefined && updatedPartitions.nonEmpty &&
             l.catalogTable.get.partitionColumnNames.nonEmpty &&
-            l.catalogTable.get.partitionProviderIsHive) {
+            l.catalogTable.get.tracksPartitionsInCatalog) {
           val metastoreUpdater = AlterTableAddPartitionCommand(
             l.catalogTable.get.identifier,
             updatedPartitions.map(p => (p, None)),

http://git-wip-us.apache.org/repos/asf/spark/blob/b17057c0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 927c0c5..9c75e2a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -31,11 +31,7 @@ import org.apache.spark.sql.execution.command.RunnableCommand
 
 /**
  * A command for writing data to a [[HadoopFsRelation]].  Supports both overwriting and appending.
- * Writing to dynamic partitions is also supported.  Each [[InsertIntoHadoopFsRelationCommand]]
- * issues a single write job, and owns a UUID that identifies this job.  Each concrete
- * implementation of [[HadoopFsRelation]] should use this UUID together with task id to generate
- * unique file path for each task output file.  This UUID is passed to executor side via a
- * property named `spark.sql.sources.writeJobUUID`.
+ * Writing to dynamic partitions is also supported.
  */
 case class InsertIntoHadoopFsRelationCommand(
     outputPath: Path,

http://git-wip-us.apache.org/repos/asf/spark/blob/b17057c0/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index d4d0014..52b09c5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -96,7 +96,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       provider = Some("hive"),
       partitionColumnNames = Seq("a", "b"),
       createTime = 0L,
-      partitionProviderIsHive = true)
+      tracksPartitionsInCatalog = true)
   }
 
   private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/b17057c0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index ebba203..64ba526 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -323,8 +323,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
 
     val properties = new scala.collection.mutable.HashMap[String, String]
     properties.put(DATASOURCE_PROVIDER, provider)
-    if (table.partitionProviderIsHive) {
-      properties.put(TABLE_PARTITION_PROVIDER, "hive")
+    if (table.tracksPartitionsInCatalog) {
+      properties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG)
     }
 
     // Serialized JSON schema string may be too long to be stored into a single metastore table
@@ -489,10 +489,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
         updateLocationInStorageProps(oldTableDef, newLocation).copy(locationUri = newLocation)
       }
 
-      val partitionProviderProp = if (tableDefinition.partitionProviderIsHive) {
-        TABLE_PARTITION_PROVIDER -> "hive"
+      val partitionProviderProp = if (tableDefinition.tracksPartitionsInCatalog) {
+        TABLE_PARTITION_PROVIDER -> TABLE_PARTITION_PROVIDER_CATALOG
       } else {
-        TABLE_PARTITION_PROVIDER -> "builtin"
+        TABLE_PARTITION_PROVIDER -> TABLE_PARTITION_PROVIDER_FILESYSTEM
       }
 
       // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition,
@@ -537,7 +537,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       table
     } else {
       getProviderFromTableProperties(table).map { provider =>
-        assert(provider != "hive", "Hive serde table should not save provider in table properties.")
+        assert(provider != TABLE_PARTITION_PROVIDER_CATALOG,
+          "Hive serde table should not save provider in table properties.")
         // Internally we store the table location in storage properties with key "path" for data
         // source tables. Here we set the table location to `locationUri` field and filter out the
         // path option in storage properties, to avoid exposing this concept externally.
@@ -545,6 +546,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
           val tableLocation = getLocationFromStorageProps(table)
           updateLocationInStorageProps(table, None).copy(locationUri = tableLocation)
         }
+        val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
 
         table.copy(
           storage = storageWithLocation,
@@ -552,9 +554,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
           provider = Some(provider),
           partitionColumnNames = getPartitionColumnsFromTableProperties(table),
           bucketSpec = getBucketSpecFromTableProperties(table),
-          partitionProviderIsHive = table.properties.get(TABLE_PARTITION_PROVIDER) == Some("hive"))
+          tracksPartitionsInCatalog = partitionProvider == Some(TABLE_PARTITION_PROVIDER_CATALOG)
+        )
       } getOrElse {
-        table.copy(provider = Some("hive"), partitionProviderIsHive = true)
+        table.copy(provider = Some("hive"), tracksPartitionsInCatalog = true)
       }
     }
 
@@ -851,6 +854,8 @@ object HiveExternalCatalog {
   val STATISTICS_COL_STATS_PREFIX = STATISTICS_PREFIX + "colStats."
 
   val TABLE_PARTITION_PROVIDER = SPARK_SQL_PREFIX + "partitionProvider"
+  val TABLE_PARTITION_PROVIDER_CATALOG = "catalog"
+  val TABLE_PARTITION_PROVIDER_FILESYSTEM = "filesystem"
 
 
   def getProviderFromTableProperties(metadata: CatalogTable): Option[String] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org