You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/01/22 03:41:34 UTC

spark git commit: [SPARK-19153][SQL] DataFrameWriter.saveAsTable work with create partitioned table

Repository: spark
Updated Branches:
  refs/heads/master 6113fe78a -> aa014eb74


[SPARK-19153][SQL] DataFrameWriter.saveAsTable work with create partitioned table

## What changes were proposed in this pull request?

After [SPARK-19107](https://issues.apache.org/jira/browse/SPARK-19153), we now can treat hive as a data source and create hive tables with DataFrameWriter and Catalog. However, the support is not completed, there are still some cases we do not support.

this PR provide DataFrameWriter.saveAsTable work with hive format to create partitioned table.

## How was this patch tested?
unit test added

Author: windpiger <so...@outlook.com>

Closes #16593 from windpiger/saveAsTableWithPartitionedTable.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa014eb7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa014eb7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa014eb7

Branch: refs/heads/master
Commit: aa014eb74bec332ca4d734f2501a4a01a806fa37
Parents: 6113fe7
Author: windpiger <so...@outlook.com>
Authored: Sun Jan 22 11:41:27 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sun Jan 22 11:41:27 2017 +0800

----------------------------------------------------------------------
 .../apache/spark/sql/hive/HiveStrategies.scala  |  8 -----
 .../CreateHiveTableAsSelectCommand.scala        |  4 +--
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 33 +++++++++++++++-----
 3 files changed, 26 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/aa014eb7/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index b649612..838e6f4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -40,14 +40,6 @@ class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] {
       if (t.bucketSpec.isDefined) {
         throw new AnalysisException("Creating bucketed Hive serde table is not supported yet.")
       }
-      if (t.partitionColumnNames.nonEmpty && query.isDefined) {
-        val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " +
-          "create a partitioned table using Hive's file formats. " +
-          "Please use the syntax of \"CREATE TABLE tableName USING dataSource " +
-          "OPTIONS (...) PARTITIONED BY ...\" to create a partitioned table through a " +
-          "CTAS statement."
-        throw new AnalysisException(errorMessage)
-      }
 
       val defaultStorage = HiveSerDe.getDefaultStorage(conf)
       val options = new HiveOptions(t.storage.properties)

http://git-wip-us.apache.org/repos/asf/spark/blob/aa014eb7/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index ccc2d64..0d30053 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -62,9 +62,7 @@ case class CreateHiveTableAsSelectCommand(
           compressed = tableDesc.storage.compressed)
 
       val withSchema = if (withFormat.schema.isEmpty) {
-        // Hive doesn't support specifying the column list for target table in CTAS
-        // However we don't think SparkSQL should follow that.
-        tableDesc.copy(schema = query.output.toStructType)
+        tableDesc.copy(schema = query.schema)
       } else {
         withFormat
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/aa014eb7/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 235fbd3..41917cc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1353,12 +1353,6 @@ class HiveDDLSuite
       sql("INSERT INTO t SELECT 2, 'b'")
       checkAnswer(spark.table("t"), Row(9, "x") :: Row(2, "b") :: Nil)
 
-      val e = intercept[AnalysisException] {
-        Seq(1 -> "a").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t2")
-      }
-      assert(e.message.contains("A Create Table As Select (CTAS) statement is not allowed " +
-        "to create a partitioned table using Hive"))
-
       val e2 = intercept[AnalysisException] {
         Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, "i").saveAsTable("t2")
       }
@@ -1371,6 +1365,22 @@ class HiveDDLSuite
     }
   }
 
+  test("create partitioned hive serde table as select") {
+    withTable("t", "t1") {
+      withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+        Seq(10 -> "y").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t")
+        checkAnswer(spark.table("t"), Row("y", 10) :: Nil)
+
+        Seq((1, 2, 3)).toDF("i", "j", "k").write.mode("overwrite").format("hive")
+          .partitionBy("j", "k").saveAsTable("t")
+        checkAnswer(spark.table("t"), Row(1, 2, 3) :: Nil)
+
+        spark.sql("create table t1 using hive partitioned by (i) as select 1 as i, 'a' as j")
+        checkAnswer(spark.table("t1"), Row("a", 1) :: Nil)
+      }
+    }
+  }
+
   test("read/write files with hive data source is not allowed") {
     withTempDir { dir =>
       val e = intercept[AnalysisException] {
@@ -1390,7 +1400,7 @@ class HiveDDLSuite
       spark.sessionState.catalog.getTableMetadata(TableIdentifier(tblName)).schema.map(_.name)
     }
 
-    withTable("t", "t1", "t2", "t3", "t4") {
+    withTable("t", "t1", "t2", "t3", "t4", "t5", "t6") {
       sql("CREATE TABLE t(a int, b int, c int, d int) USING parquet PARTITIONED BY (d, b)")
       assert(getTableColumns("t") == Seq("a", "c", "d", "b"))
 
@@ -1411,7 +1421,14 @@ class HiveDDLSuite
       sql("CREATE TABLE t4(a int, b int, c int, d int) USING hive PARTITIONED BY (d, b)")
       assert(getTableColumns("t4") == Seq("a", "c", "d", "b"))
 
-      // TODO: add test for creating partitioned hive serde table as select, once we support it.
+      withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+        sql("CREATE TABLE t5 USING hive PARTITIONED BY (d, b) AS SELECT 1 a, 1 b, 1 c, 1 d")
+        assert(getTableColumns("t5") == Seq("a", "c", "d", "b"))
+
+        Seq((1, 1, 1, 1)).toDF("a", "b", "c", "d").write.format("hive")
+          .partitionBy("d", "b").saveAsTable("t6")
+        assert(getTableColumns("t6") == Seq("a", "c", "d", "b"))
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org