You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/01/10 11:26:58 UTC

spark git commit: [SPARK-19107][SQL] support creating hive table with DataFrameWriter and Catalog

Repository: spark
Updated Branches:
  refs/heads/master b0e5840d4 -> b0319c2ec


[SPARK-19107][SQL] support creating hive table with DataFrameWriter and Catalog

## What changes were proposed in this pull request?

After unifying the CREATE TABLE syntax in https://github.com/apache/spark/pull/16296, it's pretty easy to support creating hive table with `DataFrameWriter` and `Catalog` now.

This PR basically just removes the hive provider check in `DataFrameWriter.saveAsTable` and `Catalog.createExternalTable`, and add tests.

## How was this patch tested?

new tests in `HiveDDLSuite`

Author: Wenchen Fan <we...@databricks.com>

Closes #16487 from cloud-fan/hive-table.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b0319c2e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b0319c2e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b0319c2e

Branch: refs/heads/master
Commit: b0319c2ecb51bb97c3228afa4a384572b9ffbce6
Parents: b0e5840
Author: Wenchen Fan <we...@databricks.com>
Authored: Tue Jan 10 19:26:51 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Jan 10 19:26:51 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameReader.scala  | 14 ++--
 .../org/apache/spark/sql/DataFrameWriter.scala  | 11 ++-
 .../apache/spark/sql/internal/CatalogImpl.scala |  4 -
 .../spark/sql/internal/CatalogSuite.scala       |  7 --
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 20 -----
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 77 ++++++++++++++++++++
 6 files changed, 93 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b0319c2e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 365b50d..cd83836 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -28,6 +28,7 @@ import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
 import org.apache.spark.sql.execution.LogicalRDD
+import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.datasources.jdbc._
 import org.apache.spark.sql.execution.datasources.json.InferSchema
@@ -143,6 +144,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    */
   @scala.annotation.varargs
   def load(paths: String*): DataFrame = {
+    if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
+      throw new AnalysisException("Hive data source can only be used with tables, you can not " +
+        "read files of Hive data source directly.")
+    }
+
     sparkSession.baseRelationToDataFrame(
       DataSource.apply(
         sparkSession,
@@ -160,7 +166,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    */
   def jdbc(url: String, table: String, properties: Properties): DataFrame = {
     // properties should override settings in extraOptions.
-    this.extraOptions = this.extraOptions ++ properties.asScala
+    this.extraOptions ++= properties.asScala
     // explicit url and dbtable should override all
     this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
     format("jdbc").load()
@@ -469,9 +475,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * @since 1.4.0
    */
   def table(tableName: String): DataFrame = {
-    Dataset.ofRows(sparkSession,
-      sparkSession.sessionState.catalog.lookupRelation(
-        sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)))
+    sparkSession.table(tableName)
   }
 
   /**
@@ -550,6 +554,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
 
   private var userSpecifiedSchema: Option[StructType] = None
 
-  private var extraOptions = new scala.collection.mutable.HashMap[String, String]
+  private val extraOptions = new scala.collection.mutable.HashMap[String, String]
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b0319c2e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 3127ebf..82331fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -205,6 +205,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    * @since 1.4.0
    */
   def save(): Unit = {
+    if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
+      throw new AnalysisException("Hive data source can only be used with tables, you can not " +
+        "write files of Hive data source directly.")
+    }
+
     assertNotBucketed("save")
     val dataSource = DataSource(
       df.sparkSession,
@@ -361,10 +366,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
   }
 
   private def saveAsTable(tableIdent: TableIdentifier): Unit = {
-    if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
-      throw new AnalysisException("Cannot create hive serde table with saveAsTable API")
-    }
-
     val catalog = df.sparkSession.sessionState.catalog
     val tableExists = catalog.tableExists(tableIdent)
     val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase)
@@ -385,6 +386,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         }
         EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match {
           // Only do the check if the table is a data source table (the relation is a BaseRelation).
+          // TODO(cloud-fan): also check hive table relation here when we support overwrite mode
+          // for creating hive tables.
           case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
             throw new AnalysisException(
               s"Cannot overwrite table $tableName that is also being read from")

http://git-wip-us.apache.org/repos/asf/spark/blob/b0319c2e/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 41ed9d7..8244b21 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -347,10 +347,6 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
       source: String,
       schema: StructType,
       options: Map[String, String]): DataFrame = {
-    if (source.toLowerCase == "hive") {
-      throw new AnalysisException("Cannot create hive serde table with createExternalTable API.")
-    }
-
     val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
     val tableDesc = CatalogTable(
       identifier = tableIdent,

http://git-wip-us.apache.org/repos/asf/spark/blob/b0319c2e/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 89ec162..5dd0454 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -322,13 +322,6 @@ class CatalogSuite
     assert(e2.message == "Cannot create a file-based external data source table without path")
   }
 
-  test("createExternalTable should fail if provider is hive") {
-    val e = intercept[AnalysisException] {
-      spark.catalog.createExternalTable("tbl", "HiVe", Map.empty[String, String])
-    }
-    assert(e.message.contains("Cannot create hive serde table with createExternalTable API"))
-  }
-
   test("dropTempView should not un-cache and drop metastore table if a same-name table exists") {
     withTable("same_name") {
       spark.range(10).write.saveAsTable("same_name")

http://git-wip-us.apache.org/repos/asf/spark/blob/b0319c2e/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index aed825e..13ef79e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1169,26 +1169,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
     }
   }
 
-  test("save API - format hive") {
-    withTempDir { dir =>
-      val path = dir.getCanonicalPath
-      val e = intercept[ClassNotFoundException] {
-        spark.range(10).write.format("hive").mode(SaveMode.Ignore).save(path)
-      }.getMessage
-      assert(e.contains("Failed to find data source: hive"))
-    }
-  }
-
-  test("saveAsTable API - format hive") {
-    val tableName = "tab1"
-    withTable(tableName) {
-      val e = intercept[AnalysisException] {
-        spark.range(10).write.format("hive").mode(SaveMode.Overwrite).saveAsTable(tableName)
-      }.getMessage
-      assert(e.contains("Cannot create hive serde table with saveAsTable API"))
-    }
-  }
-
   test("create a temp view using hive") {
     val tableName = "tab1"
     withTable (tableName) {

http://git-wip-us.apache.org/repos/asf/spark/blob/b0319c2e/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 3ac07d0..7728528 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types.StructType
 
 class HiveDDLSuite
   extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
@@ -1289,4 +1290,80 @@ class HiveDDLSuite
       }
     }
   }
+
+  test("create hive serde table with Catalog") {
+    withTable("t") {
+      withTempDir { dir =>
+        val df = spark.catalog.createExternalTable(
+          "t",
+          "hive",
+          new StructType().add("i", "int"),
+          Map("path" -> dir.getCanonicalPath, "fileFormat" -> "parquet"))
+        assert(df.collect().isEmpty)
+
+        val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+        assert(DDLUtils.isHiveTable(table))
+        assert(table.storage.inputFormat ==
+          Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
+        assert(table.storage.outputFormat ==
+          Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
+        assert(table.storage.serde ==
+          Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
+
+        sql("INSERT INTO t SELECT 1")
+        checkAnswer(spark.table("t"), Row(1))
+      }
+    }
+  }
+
+  test("create hive serde table with DataFrameWriter.saveAsTable") {
+    withTable("t", "t2") {
+      Seq(1 -> "a").toDF("i", "j")
+        .write.format("hive").option("fileFormat", "avro").saveAsTable("t")
+      checkAnswer(spark.table("t"), Row(1, "a"))
+
+      val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+      assert(DDLUtils.isHiveTable(table))
+      assert(table.storage.inputFormat ==
+        Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"))
+      assert(table.storage.outputFormat ==
+        Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"))
+      assert(table.storage.serde ==
+        Some("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))
+
+      sql("INSERT INTO t SELECT 2, 'b'")
+      checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil)
+
+      val e = intercept[AnalysisException] {
+        Seq(1 -> "a").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t2")
+      }
+      assert(e.message.contains("A Create Table As Select (CTAS) statement is not allowed " +
+        "to create a partitioned table using Hive"))
+
+      val e2 = intercept[AnalysisException] {
+        Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, "i").saveAsTable("t2")
+      }
+      assert(e2.message.contains("Creating bucketed Hive serde table is not supported yet"))
+
+      val e3 = intercept[AnalysisException] {
+        spark.table("t").write.format("hive").mode("overwrite").saveAsTable("t")
+      }
+      assert(e3.message.contains(
+        "CTAS for hive serde tables does not support append or overwrite semantics"))
+    }
+  }
+
+  test("read/write files with hive data source is not allowed") {
+    withTempDir { dir =>
+      val e = intercept[AnalysisException] {
+        spark.read.format("hive").load(dir.getAbsolutePath)
+      }
+      assert(e.message.contains("Hive data source can only be used with tables"))
+
+      val e2 = intercept[AnalysisException] {
+        Seq(1 -> "a").toDF("i", "j").write.format("hive").save(dir.getAbsolutePath)
+      }
+      assert(e2.message.contains("Hive data source can only be used with tables"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org