You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/04/11 01:05:22 UTC

spark git commit: [SPARK-6851][SQL] Create new instance for each converted parquet relation

Repository: spark
Updated Branches:
  refs/heads/master 68ecdb7f9 -> 23d5f8864


[SPARK-6851][SQL] Create new instance for each converted parquet relation

Otherwise we end up rewriting predicates to be trivially equal (i.e. `a#1 = a#2` -> `a#3 = a#3`), at which point the query is no longer valid.

Author: Michael Armbrust <mi...@databricks.com>

Closes #5458 from marmbrus/selfJoinParquet and squashes the following commits:

22df77c [Michael Armbrust] [SPARK-6851][SQL] Create new instance for each converted parquet relation


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23d5f886
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23d5f886
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23d5f886

Branch: refs/heads/master
Commit: 23d5f8864f7d665a74b1d38118700139854dbb1c
Parents: 68ecdb7
Author: Michael Armbrust <mi...@databricks.com>
Authored: Fri Apr 10 16:05:14 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Fri Apr 10 16:05:14 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  4 +-
 .../sql/hive/execution/SQLQuerySuite.scala      | 78 +++++++++++++++++++-
 2 files changed, 80 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/23d5f886/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 315fab6..3ed5c5b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -279,7 +279,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
       }
     }
 
-    if (metastoreRelation.hiveQlTable.isPartitioned) {
+    val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
       val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
       val partitionColumnDataTypes = partitionSchema.map(_.dataType)
       val partitions = metastoreRelation.hiveQlPartitions.map { p =>
@@ -314,6 +314,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
 
       parquetRelation
     }
+
+    result.newInstance()
   }
 
   override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/23d5f886/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 7811bd2..4c369c0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -34,6 +34,17 @@ case class Nested3(f3: Int)
 case class NestedArray2(b: Seq[Int])
 case class NestedArray1(a: NestedArray2)
 
+case class Order(
+    id: Int,
+    make: String,
+    `type`: String,
+    price: Int,
+    pdate: String,
+    customer: String,
+    city: String,
+    state: String,
+    month: Int)
+
 /**
  * A collection of hive query tests where we generate the answers ourselves instead of depending on
  * Hive to generate them (in contrast to HiveQuerySuite).  Often this is because the query is
@@ -41,6 +52,72 @@ case class NestedArray1(a: NestedArray2)
  */
 class SQLQuerySuite extends QueryTest {
 
+  test("SPARK-6851: Self-joined converted parquet tables") {
+    val orders = Seq(
+      Order(1, "Atlas", "MTB", 234, "2015-01-07", "John D", "Pacifica", "CA", 20151),
+      Order(3, "Swift", "MTB", 285, "2015-01-17", "John S", "Redwood City", "CA", 20151),
+      Order(4, "Atlas", "Hybrid", 303, "2015-01-23", "Jones S", "San Mateo", "CA", 20151),
+      Order(7, "Next", "MTB", 356, "2015-01-04", "Jane D", "Daly City", "CA", 20151),
+      Order(10, "Next", "YFlikr", 187, "2015-01-09", "John D", "Fremont", "CA", 20151),
+      Order(11, "Swift", "YFlikr", 187, "2015-01-23", "John D", "Hayward", "CA", 20151),
+      Order(2, "Next", "Hybrid", 324, "2015-02-03", "Jane D", "Daly City", "CA", 20152),
+      Order(5, "Next", "Street", 187, "2015-02-08", "John D", "Fremont", "CA", 20152),
+      Order(6, "Atlas", "Street", 154, "2015-02-09", "John D", "Pacifica", "CA", 20152),
+      Order(8, "Swift", "Hybrid", 485, "2015-02-19", "John S", "Redwood City", "CA", 20152),
+      Order(9, "Atlas", "Split", 303, "2015-02-28", "Jones S", "San Mateo", "CA", 20152))
+
+    val orderUpdates = Seq(
+      Order(1, "Atlas", "MTB", 434, "2015-01-07", "John D", "Pacifica", "CA", 20151),
+      Order(11, "Swift", "YFlikr", 137, "2015-01-23", "John D", "Hayward", "CA", 20151))
+
+    orders.toDF.registerTempTable("orders1")
+    orderUpdates.toDF.registerTempTable("orderupdates1")
+
+    sql(
+      """CREATE TABLE orders(
+        |  id INT,
+        |  make String,
+        |  type String,
+        |  price INT,
+        |  pdate String,
+        |  customer String,
+        |  city String)
+        |PARTITIONED BY (state STRING, month INT)
+        |STORED AS PARQUET
+      """.stripMargin)
+
+    sql(
+      """CREATE TABLE orderupdates(
+        |  id INT,
+        |  make String,
+        |  type String,
+        |  price INT,
+        |  pdate String,
+        |  customer String,
+        |  city String)
+        |PARTITIONED BY (state STRING, month INT)
+        |STORED AS PARQUET
+      """.stripMargin)
+
+    sql("set hive.exec.dynamic.partition.mode=nonstrict")
+    sql("INSERT INTO TABLE orders PARTITION(state, month) SELECT * FROM orders1")
+    sql("INSERT INTO TABLE orderupdates PARTITION(state, month) SELECT * FROM orderupdates1")
+
+    checkAnswer(
+      sql(
+        """
+          |select orders.state, orders.month
+          |from orders
+          |join (
+          |  select distinct orders.state,orders.month
+          |  from orders
+          |  join orderupdates
+          |    on orderupdates.id = orders.id) ao
+          |  on ao.state = orders.state and ao.month = orders.month
+        """.stripMargin),
+      (1 to 6).map(_ => Row("CA", 20151)))
+  }
+
   test("SPARK-5371: union with null and sum") {
     val df = Seq((1, 1)).toDF("c1", "c2")
     df.registerTempTable("table1")
@@ -478,5 +555,4 @@ class SQLQuerySuite extends QueryTest {
     sql("select d from dn union all select d * 2 from dn")
       .queryExecution.analyzed
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org