You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/11/18 01:31:18 UTC

spark git commit: [SPARK-18360][SQL] default table path of tables in default database should depend on the location of default database

Repository: spark
Updated Branches:
  refs/heads/master b0aa1aa1a -> ce13c2672


[SPARK-18360][SQL] default table path of tables in default database should depend on the location of default database

## What changes were proposed in this pull request?

The current semantic of the warehouse config:

1. it's a static config, which means you can't change it once your spark application is launched.
2. Once a database is created, its location won't change even the warehouse path config is changed.
3. default database is a special case, although its location is fixed, but the locations of tables created in it are not. If a Spark app starts with warehouse path B(while the location of default database is A), then users create a table `tbl` in default database, its location will be `B/tbl` instead of `A/tbl`. If uses change the warehouse path config to C, and create another table `tbl2`, its location will still be `B/tbl2` instead of `C/tbl2`.

rule 3 doesn't make sense and I think we made it by mistake, not intentionally. Data source tables don't follow rule 3 and treat default database like normal ones.

This PR fixes hive serde tables to make it consistent with data source tables.

## How was this patch tested?

HiveSparkSubmitSuite

Author: Wenchen Fan <we...@databricks.com>

Closes #15812 from cloud-fan/default-db.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce13c267
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce13c267
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce13c267

Branch: refs/heads/master
Commit: ce13c2672318242748f7520ed4ce6bcfad4fb428
Parents: b0aa1aa
Author: Wenchen Fan <we...@databricks.com>
Authored: Thu Nov 17 17:31:12 2016 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Thu Nov 17 17:31:12 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/hive/HiveExternalCatalog.scala    | 237 ++++++++++---------
 .../spark/sql/hive/HiveSparkSubmitSuite.scala   |  76 +++++-
 2 files changed, 190 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ce13c267/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 8433058..cacffcf 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -197,136 +197,151 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
 
     if (tableDefinition.tableType == VIEW) {
       client.createTable(tableDefinition, ignoreIfExists)
-    } else if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) {
-      // Here we follow data source tables and put table metadata like provider, schema, etc. in
-      // table properties, so that we can work around the Hive metastore issue about not case
-      // preserving and make Hive serde table support mixed-case column names.
-      val tableWithDataSourceProps = tableDefinition.copy(
-        properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition))
-      client.createTable(tableWithDataSourceProps, ignoreIfExists)
     } else {
-      // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type
-      // support, no column nullability, etc., we should do some extra works before saving table
-      // metadata into Hive metastore:
-      //  1. Put table metadata like provider, schema, etc. in table properties.
-      //  2. Check if this table is hive compatible.
-      //    2.1  If it's not hive compatible, set location URI, schema, partition columns and bucket
-      //         spec to empty and save table metadata to Hive.
-      //    2.2  If it's hive compatible, set serde information in table metadata and try to save
-      //         it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
-      val tableProperties = tableMetaToTableProps(tableDefinition)
-
       // Ideally we should not create a managed table with location, but Hive serde table can
       // specify location for managed table. And in [[CreateDataSourceTableAsSelectCommand]] we have
       // to create the table directory and write out data before we create this table, to avoid
       // exposing a partial written table.
       val needDefaultTableLocation = tableDefinition.tableType == MANAGED &&
         tableDefinition.storage.locationUri.isEmpty
+
       val tableLocation = if (needDefaultTableLocation) {
         Some(defaultTablePath(tableDefinition.identifier))
       } else {
         tableDefinition.storage.locationUri
       }
-      // Ideally we should also put `locationUri` in table properties like provider, schema, etc.
-      // However, in older version of Spark we already store table location in storage properties
-      // with key "path". Here we keep this behaviour for backward compatibility.
-      val storagePropsWithLocation = tableDefinition.storage.properties ++
-        tableLocation.map("path" -> _)
-
-      // converts the table metadata to Spark SQL specific format, i.e. set data schema, names and
-      // bucket specification to empty. Note that partition columns are retained, so that we can
-      // call partition-related Hive API later.
-      def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
-        tableDefinition.copy(
-          // Hive only allows directory paths as location URIs while Spark SQL data source tables
-          // also allow file paths. For non-hive-compatible format, we should not set location URI
-          // to avoid hive metastore to throw exception.
-          storage = tableDefinition.storage.copy(
-            locationUri = None,
-            properties = storagePropsWithLocation),
-          schema = tableDefinition.partitionSchema,
-          bucketSpec = None,
-          properties = tableDefinition.properties ++ tableProperties)
+
+      if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) {
+        val tableWithDataSourceProps = tableDefinition.copy(
+          // We can't leave `locationUri` empty and count on Hive metastore to set a default table
+          // location, because Hive metastore uses hive.metastore.warehouse.dir to generate default
+          // table location for tables in default database, while we expect to use the location of
+          // default database.
+          storage = tableDefinition.storage.copy(locationUri = tableLocation),
+          // Here we follow data source tables and put table metadata like provider, schema, etc. in
+          // table properties, so that we can work around the Hive metastore issue about not case
+          // preserving and make Hive serde table support mixed-case column names.
+          properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition))
+        client.createTable(tableWithDataSourceProps, ignoreIfExists)
+      } else {
+        createDataSourceTable(
+          tableDefinition.withNewStorage(locationUri = tableLocation),
+          ignoreIfExists)
       }
+    }
+  }
 
-      // converts the table metadata to Hive compatible format, i.e. set the serde information.
-      def newHiveCompatibleMetastoreTable(serde: HiveSerDe): CatalogTable = {
-        val location = if (tableDefinition.tableType == EXTERNAL) {
-          // When we hit this branch, we are saving an external data source table with hive
-          // compatible format, which means the data source is file-based and must have a `path`.
-          require(tableDefinition.storage.locationUri.isDefined,
-            "External file-based data source table must have a `path` entry in storage properties.")
-          Some(new Path(tableDefinition.location).toUri.toString)
-        } else {
-          None
-        }
+  private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = {
+    // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type
+    // support, no column nullability, etc., we should do some extra works before saving table
+    // metadata into Hive metastore:
+    //  1. Put table metadata like provider, schema, etc. in table properties.
+    //  2. Check if this table is hive compatible.
+    //    2.1  If it's not hive compatible, set location URI, schema, partition columns and bucket
+    //         spec to empty and save table metadata to Hive.
+    //    2.2  If it's hive compatible, set serde information in table metadata and try to save
+    //         it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
+    val tableProperties = tableMetaToTableProps(table)
+
+    // Ideally we should also put `locationUri` in table properties like provider, schema, etc.
+    // However, in older version of Spark we already store table location in storage properties
+    // with key "path". Here we keep this behaviour for backward compatibility.
+    val storagePropsWithLocation = table.storage.properties ++
+      table.storage.locationUri.map("path" -> _)
+
+    // converts the table metadata to Spark SQL specific format, i.e. set data schema, names and
+    // bucket specification to empty. Note that partition columns are retained, so that we can
+    // call partition-related Hive API later.
+    def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
+      table.copy(
+        // Hive only allows directory paths as location URIs while Spark SQL data source tables
+        // also allow file paths. For non-hive-compatible format, we should not set location URI
+        // to avoid hive metastore to throw exception.
+        storage = table.storage.copy(
+          locationUri = None,
+          properties = storagePropsWithLocation),
+        schema = table.partitionSchema,
+        bucketSpec = None,
+        properties = table.properties ++ tableProperties)
+    }
 
-        tableDefinition.copy(
-          storage = tableDefinition.storage.copy(
-            locationUri = location,
-            inputFormat = serde.inputFormat,
-            outputFormat = serde.outputFormat,
-            serde = serde.serde,
-            properties = storagePropsWithLocation
-          ),
-          properties = tableDefinition.properties ++ tableProperties)
+    // converts the table metadata to Hive compatible format, i.e. set the serde information.
+    def newHiveCompatibleMetastoreTable(serde: HiveSerDe): CatalogTable = {
+      val location = if (table.tableType == EXTERNAL) {
+        // When we hit this branch, we are saving an external data source table with hive
+        // compatible format, which means the data source is file-based and must have a `path`.
+        require(table.storage.locationUri.isDefined,
+          "External file-based data source table must have a `path` entry in storage properties.")
+        Some(new Path(table.location).toUri.toString)
+      } else {
+        None
       }
 
-      val qualifiedTableName = tableDefinition.identifier.quotedString
-      val maybeSerde = HiveSerDe.sourceToSerDe(tableDefinition.provider.get)
-      val skipHiveMetadata = tableDefinition.storage.properties
-        .getOrElse("skipHiveMetadata", "false").toBoolean
-
-      val (hiveCompatibleTable, logMessage) = maybeSerde match {
-        case _ if skipHiveMetadata =>
-          val message =
-            s"Persisting data source table $qualifiedTableName into Hive metastore in" +
-              "Spark SQL specific format, which is NOT compatible with Hive."
-          (None, message)
-
-        // our bucketing is un-compatible with hive(different hash function)
-        case _ if tableDefinition.bucketSpec.nonEmpty =>
-          val message =
-            s"Persisting bucketed data source table $qualifiedTableName into " +
-              "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. "
-          (None, message)
-
-        case Some(serde) =>
-          val message =
-            s"Persisting file based data source table $qualifiedTableName into " +
-              s"Hive metastore in Hive compatible format."
-          (Some(newHiveCompatibleMetastoreTable(serde)), message)
-
-        case _ =>
-          val provider = tableDefinition.provider.get
-          val message =
-            s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
-              s"Persisting data source table $qualifiedTableName into Hive metastore in " +
-              s"Spark SQL specific format, which is NOT compatible with Hive."
-          (None, message)
-      }
+      table.copy(
+        storage = table.storage.copy(
+          locationUri = location,
+          inputFormat = serde.inputFormat,
+          outputFormat = serde.outputFormat,
+          serde = serde.serde,
+          properties = storagePropsWithLocation
+        ),
+        properties = table.properties ++ tableProperties)
+    }
 
-      (hiveCompatibleTable, logMessage) match {
-        case (Some(table), message) =>
-          // We first try to save the metadata of the table in a Hive compatible way.
-          // If Hive throws an error, we fall back to save its metadata in the Spark SQL
-          // specific way.
-          try {
-            logInfo(message)
-            saveTableIntoHive(table, ignoreIfExists)
-          } catch {
-            case NonFatal(e) =>
-              val warningMessage =
-                s"Could not persist ${tableDefinition.identifier.quotedString} in a Hive " +
-                  "compatible way. Persisting it into Hive metastore in Spark SQL specific format."
-              logWarning(warningMessage, e)
-              saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists)
-          }
+    val qualifiedTableName = table.identifier.quotedString
+    val maybeSerde = HiveSerDe.sourceToSerDe(table.provider.get)
+    val skipHiveMetadata = table.storage.properties
+      .getOrElse("skipHiveMetadata", "false").toBoolean
+
+    val (hiveCompatibleTable, logMessage) = maybeSerde match {
+      case _ if skipHiveMetadata =>
+        val message =
+          s"Persisting data source table $qualifiedTableName into Hive metastore in" +
+            "Spark SQL specific format, which is NOT compatible with Hive."
+        (None, message)
+
+      // our bucketing is un-compatible with hive(different hash function)
+      case _ if table.bucketSpec.nonEmpty =>
+        val message =
+          s"Persisting bucketed data source table $qualifiedTableName into " +
+            "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. "
+        (None, message)
+
+      case Some(serde) =>
+        val message =
+          s"Persisting file based data source table $qualifiedTableName into " +
+            s"Hive metastore in Hive compatible format."
+        (Some(newHiveCompatibleMetastoreTable(serde)), message)
+
+      case _ =>
+        val provider = table.provider.get
+        val message =
+          s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
+            s"Persisting data source table $qualifiedTableName into Hive metastore in " +
+            s"Spark SQL specific format, which is NOT compatible with Hive."
+        (None, message)
+    }
 
-        case (None, message) =>
-          logWarning(message)
-          saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists)
-      }
+    (hiveCompatibleTable, logMessage) match {
+      case (Some(table), message) =>
+        // We first try to save the metadata of the table in a Hive compatible way.
+        // If Hive throws an error, we fall back to save its metadata in the Spark SQL
+        // specific way.
+        try {
+          logInfo(message)
+          saveTableIntoHive(table, ignoreIfExists)
+        } catch {
+          case NonFatal(e) =>
+            val warningMessage =
+              s"Could not persist ${table.identifier.quotedString} in a Hive " +
+                "compatible way. Persisting it into Hive metastore in Spark SQL specific format."
+            logWarning(warningMessage, e)
+            saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists)
+        }
+
+      case (None, message) =>
+        logWarning(message)
+        saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ce13c267/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index fbd7051..a670560 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -24,6 +24,7 @@ import java.util.Date
 import scala.collection.mutable.ArrayBuffer
 import scala.tools.nsc.Properties
 
+import org.apache.hadoop.fs.Path
 import org.scalatest.{BeforeAndAfterEach, Matchers}
 import org.scalatest.concurrent.Timeouts
 import org.scalatest.exceptions.TestFailedDueToTimeoutException
@@ -33,11 +34,12 @@ import org.apache.spark._
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{QueryTest, Row, SparkSession}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource, JarResource}
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
 import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
-import org.apache.spark.sql.types.DecimalType
+import org.apache.spark.sql.types.{DecimalType, StructType}
 import org.apache.spark.util.{ResetSystemProperties, Utils}
 
 /**
@@ -295,6 +297,20 @@ class HiveSparkSubmitSuite
     runSparkSubmit(args)
   }
 
+  test("SPARK-18360: default table path of tables in default database should depend on the " +
+    "location of default database") {
+    val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+    val args = Seq(
+      "--class", SPARK_18360.getClass.getName.stripSuffix("$"),
+      "--name", "SPARK-18360",
+      "--master", "local-cluster[2,1,1024]",
+      "--conf", "spark.ui.enabled=false",
+      "--conf", "spark.master.rest.enabled=false",
+      "--driver-java-options", "-Dderby.system.durability=test",
+      unusedJar.toString)
+    runSparkSubmit(args)
+  }
+
   // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
   // This is copied from org.apache.spark.deploy.SparkSubmitSuite
   private def runSparkSubmit(args: Seq[String]): Unit = {
@@ -397,11 +413,7 @@ object SetWarehouseLocationTest extends Logging {
   def main(args: Array[String]): Unit = {
     Utils.configTestLog4j("INFO")
 
-    val sparkConf = new SparkConf(loadDefaults = true)
-    val builder = SparkSession.builder()
-      .config(sparkConf)
-      .config("spark.ui.enabled", "false")
-      .enableHiveSupport()
+    val sparkConf = new SparkConf(loadDefaults = true).set("spark.ui.enabled", "false")
     val providedExpectedWarehouseLocation =
       sparkConf.getOption("spark.sql.test.expectedWarehouseDir")
 
@@ -410,7 +422,7 @@ object SetWarehouseLocationTest extends Logging {
         // If spark.sql.test.expectedWarehouseDir is set, the warehouse dir is set
         // through spark-summit. So, neither spark.sql.warehouse.dir nor
         // hive.metastore.warehouse.dir is set at here.
-        (builder.getOrCreate(), warehouseDir)
+        (new TestHiveContext(new SparkContext(sparkConf)).sparkSession, warehouseDir)
       case None =>
         val warehouseLocation = Utils.createTempDir()
         warehouseLocation.delete()
@@ -420,10 +432,10 @@ object SetWarehouseLocationTest extends Logging {
         // spark.sql.warehouse.dir and hive.metastore.warehouse.dir.
         // We are expecting that the value of spark.sql.warehouse.dir will override the
         // value of hive.metastore.warehouse.dir.
-        val session = builder
-          .config("spark.sql.warehouse.dir", warehouseLocation.toString)
-          .config("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString)
-          .getOrCreate()
+        val session = new TestHiveContext(new SparkContext(sparkConf
+          .set("spark.sql.warehouse.dir", warehouseLocation.toString)
+          .set("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString)))
+          .sparkSession
         (session, warehouseLocation.toString)
 
     }
@@ -801,3 +813,43 @@ object SPARK_14244 extends QueryTest {
     }
   }
 }
+
+object SPARK_18360 {
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession.builder()
+      .config("spark.ui.enabled", "false")
+      .enableHiveSupport().getOrCreate()
+
+    val defaultDbLocation = spark.catalog.getDatabase("default").locationUri
+    assert(new Path(defaultDbLocation) == new Path(spark.sharedState.warehousePath))
+
+    val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+
+    try {
+      val tableMeta = CatalogTable(
+        identifier = TableIdentifier("test_tbl", Some("default")),
+        tableType = CatalogTableType.MANAGED,
+        storage = CatalogStorageFormat.empty,
+        schema = new StructType().add("i", "int"),
+        provider = Some(DDLUtils.HIVE_PROVIDER))
+
+      val newWarehousePath = Utils.createTempDir().getAbsolutePath
+      hiveClient.runSqlHive(s"SET hive.metastore.warehouse.dir=$newWarehousePath")
+      hiveClient.createTable(tableMeta, ignoreIfExists = false)
+      val rawTable = hiveClient.getTable("default", "test_tbl")
+      // Hive will use the value of `hive.metastore.warehouse.dir` to generate default table
+      // location for tables in default database.
+      assert(rawTable.storage.locationUri.get.contains(newWarehousePath))
+      hiveClient.dropTable("default", "test_tbl", ignoreIfNotExists = false, purge = false)
+
+      spark.sharedState.externalCatalog.createTable(tableMeta, ignoreIfExists = false)
+      val readBack = spark.sharedState.externalCatalog.getTable("default", "test_tbl")
+      // Spark SQL will use the location of default database to generate default table
+      // location for tables in default database.
+      assert(readBack.storage.locationUri.get.contains(defaultDbLocation))
+    } finally {
+      hiveClient.dropTable("default", "test_tbl", ignoreIfNotExists = true, purge = false)
+      hiveClient.runSqlHive(s"SET hive.metastore.warehouse.dir=$defaultDbLocation")
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org