You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/05/23 00:40:20 UTC

spark git commit: [SPARK-7270] [SQL] Consider dynamic partition when inserting into hive table

Repository: spark
Updated Branches:
  refs/heads/master e4aef91fe -> 126d7235d


[SPARK-7270] [SQL] Consider dynamic partition when inserting into hive table

JIRA: https://issues.apache.org/jira/browse/SPARK-7270

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #5864 from viirya/dyn_partition_insert and squashes the following commits:

b5627df [Liang-Chi Hsieh] For comments.
3b21e4b [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into dyn_partition_insert
8a4352d [Liang-Chi Hsieh] Consider dynamic partition when inserting into hive table.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/126d7235
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/126d7235
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/126d7235

Branch: refs/heads/master
Commit: 126d7235de649ea5619dee6ad3a70970ee90df93
Parents: e4aef91
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Fri May 22 15:39:58 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Fri May 22 15:39:58 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 18 +++++++++++++-----
 .../sql/hive/execution/HiveQuerySuite.scala     | 20 ++++++++++++++++++++
 2 files changed, 33 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/126d7235/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 5b68400..425a400 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -516,17 +516,19 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
     def castChildOutput(p: InsertIntoTable, table: MetastoreRelation, child: LogicalPlan)
       : LogicalPlan = {
       val childOutputDataTypes = child.output.map(_.dataType)
+      val numDynamicPartitions = p.partition.values.count(_.isEmpty)
       val tableOutputDataTypes =
-        (table.attributes ++ table.partitionKeys).take(child.output.length).map(_.dataType)
+        (table.attributes ++ table.partitionKeys.takeRight(numDynamicPartitions))
+          .take(child.output.length).map(_.dataType)
 
       if (childOutputDataTypes == tableOutputDataTypes) {
-        p
+        InsertIntoHiveTable(table, p.partition, p.child, p.overwrite, p.ifNotExists)
       } else if (childOutputDataTypes.size == tableOutputDataTypes.size &&
         childOutputDataTypes.zip(tableOutputDataTypes)
           .forall { case (left, right) => left.sameType(right) }) {
         // If both types ignoring nullability of ArrayType, MapType, StructType are the same,
         // use InsertIntoHiveTable instead of InsertIntoTable.
-        InsertIntoHiveTable(p.table, p.partition, p.child, p.overwrite, p.ifNotExists)
+        InsertIntoHiveTable(table, p.partition, p.child, p.overwrite, p.ifNotExists)
       } else {
         // Only do the casting when child output data types differ from table output data types.
         val castedChildOutput = child.output.zip(table.output).map {
@@ -561,7 +563,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
  * because Hive table doesn't have nullability for ARRAY, MAP, STRUCT types.
  */
 private[hive] case class InsertIntoHiveTable(
-    table: LogicalPlan,
+    table: MetastoreRelation,
     partition: Map[String, Option[String]],
     child: LogicalPlan,
     overwrite: Boolean,
@@ -571,7 +573,13 @@ private[hive] case class InsertIntoHiveTable(
   override def children: Seq[LogicalPlan] = child :: Nil
   override def output: Seq[Attribute] = child.output
 
-  override lazy val resolved: Boolean = childrenResolved && child.output.zip(table.output).forall {
+  val numDynamicPartitions = partition.values.count(_.isEmpty)
+
+  // This is the expected schema of the table prepared to be inserted into,
+  // including dynamic partition columns.
+  val tableOutput = table.attributes ++ table.partitionKeys.takeRight(numDynamicPartitions)
+
+  override lazy val resolved: Boolean = childrenResolved && child.output.zip(tableOutput).forall {
     case (childAttr, tableAttr) => childAttr.dataType.sameType(tableAttr.dataType)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/126d7235/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 65c6ef0..4af31d4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 
 import org.apache.spark.{SparkFiles, SparkException}
 import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
+import org.apache.spark.sql.catalyst.expressions.Cast
 import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.test.TestHive
@@ -415,6 +416,25 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
       |SELECT * FROM createdtable;
     """.stripMargin)
 
+  test("SPARK-7270: consider dynamic partition when comparing table output") {
+    sql(s"CREATE TABLE test_partition (a STRING) PARTITIONED BY (b BIGINT, c STRING)")
+    sql(s"CREATE TABLE ptest (a STRING, b BIGINT, c STRING)")
+
+    val analyzedPlan = sql(
+      """
+        |INSERT OVERWRITE table test_partition PARTITION (b=1, c)
+        |SELECT 'a', 'c' from ptest
+      """.stripMargin).queryExecution.analyzed
+
+    assertResult(false, "Incorrect cast detected\n" + analyzedPlan) {
+      var hasCast = false
+      analyzedPlan.collect {
+        case p: Project => p.transformExpressionsUp { case c: Cast => hasCast = true; c }
+      }
+      hasCast
+    }
+  }
+
   createQueryTest("transform",
     "SELECT TRANSFORM (key) USING 'cat' AS (tKey) FROM src")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org