You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/09/09 01:16:53 UTC

spark git commit: [SPARK-10327] [SQL] Cache Table is not working while subquery has alias in its project list

Repository: spark
Updated Branches:
  refs/heads/master 52b24a602 -> d637a666d


[SPARK-10327] [SQL] Cache Table is not working while subquery has alias in its project list

```scala
    import org.apache.spark.sql.hive.execution.HiveTableScan
    sql("select key, value, key + 1 from src").registerTempTable("abc")
    cacheTable("abc")

    val sparkPlan = sql(
      """select a.key, b.key, c.key from
        |abc a join abc b on a.key=b.key
        |join abc c on a.key=c.key""".stripMargin).queryExecution.sparkPlan

    assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 3) // failed
    assert(sparkPlan.collect { case e: HiveTableScan => e }.size === 0) // failed
```

The actual plan is:

```
== Parsed Logical Plan ==
'Project [unresolvedalias('a.key),unresolvedalias('b.key),unresolvedalias('c.key)]
 'Join Inner, Some(('a.key = 'c.key))
  'Join Inner, Some(('a.key = 'b.key))
   'UnresolvedRelation [abc], Some(a)
   'UnresolvedRelation [abc], Some(b)
  'UnresolvedRelation [abc], Some(c)

== Analyzed Logical Plan ==
key: int, key: int, key: int
Project [key#14,key#61,key#66]
 Join Inner, Some((key#14 = key#66))
  Join Inner, Some((key#14 = key#61))
   Subquery a
    Subquery abc
     Project [key#14,value#15,(key#14 + 1) AS _c2#16]
      MetastoreRelation default, src, None
   Subquery b
    Subquery abc
     Project [key#61,value#62,(key#61 + 1) AS _c2#58]
      MetastoreRelation default, src, None
  Subquery c
   Subquery abc
    Project [key#66,value#67,(key#66 + 1) AS _c2#63]
     MetastoreRelation default, src, None

== Optimized Logical Plan ==
Project [key#14,key#61,key#66]
 Join Inner, Some((key#14 = key#66))
  Project [key#14,key#61]
   Join Inner, Some((key#14 = key#61))
    Project [key#14]
     InMemoryRelation [key#14,value#15,_c2#16], true, 10000, StorageLevel(true, true, false, true, 1), (Project [key#14,value#15,(key#14 + 1) AS _c2#16]), Some(abc)
    Project [key#61]
     MetastoreRelation default, src, None
  Project [key#66]
   MetastoreRelation default, src, None

== Physical Plan ==
TungstenProject [key#14,key#61,key#66]
 BroadcastHashJoin [key#14], [key#66], BuildRight
  TungstenProject [key#14,key#61]
   BroadcastHashJoin [key#14], [key#61], BuildRight
    ConvertToUnsafe
     InMemoryColumnarTableScan [key#14], (InMemoryRelation [key#14,value#15,_c2#16], true, 10000, StorageLevel(true, true, false, true, 1), (Project [key#14,value#15,(key#14 + 1) AS _c2#16]), Some(abc))
    ConvertToUnsafe
     HiveTableScan [key#61], (MetastoreRelation default, src, None)
  ConvertToUnsafe
   HiveTableScan [key#66], (MetastoreRelation default, src, None)
```

Author: Cheng Hao <ha...@intel.com>

Closes #8494 from chenghao-intel/weird_cache.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d637a666
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d637a666
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d637a666

Branch: refs/heads/master
Commit: d637a666d5932002c8ce0bd23c06064fbfdc1c97
Parents: 52b24a6
Author: Cheng Hao <ha...@intel.com>
Authored: Tue Sep 8 16:16:50 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Sep 8 16:16:50 2015 -0700

----------------------------------------------------------------------
 .../sql/catalyst/plans/logical/LogicalPlan.scala    | 15 ++++++++++++---
 .../org/apache/spark/sql/CachedTableSuite.scala     | 16 ++++++++++++++++
 2 files changed, 28 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d637a666/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 9bb466a..8f8747e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -135,16 +135,25 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
   /** Args that have cleaned such that differences in expression id should not affect equality */
   protected lazy val cleanArgs: Seq[Any] = {
     val input = children.flatMap(_.output)
+    def cleanExpression(e: Expression) = e match {
+      case a: Alias =>
+        // As the root of the expression, Alias will always take an arbitrary exprId, we need
+        // to erase that for equality testing.
+        val cleanedExprId = Alias(a.child, a.name)(ExprId(-1), a.qualifiers)
+        BindReferences.bindReference(cleanedExprId, input, allowFailures = true)
+      case other => BindReferences.bindReference(other, input, allowFailures = true)
+    }
+
     productIterator.map {
       // Children are checked using sameResult above.
       case tn: TreeNode[_] if containsChild(tn) => null
-      case e: Expression => BindReferences.bindReference(e, input, allowFailures = true)
+      case e: Expression => cleanExpression(e)
       case s: Option[_] => s.map {
-        case e: Expression => BindReferences.bindReference(e, input, allowFailures = true)
+        case e: Expression => cleanExpression(e)
         case other => other
       }
       case s: Seq[_] => s.map {
-        case e: Expression => BindReferences.bindReference(e, input, allowFailures = true)
+        case e: Expression => cleanExpression(e)
         case other => other
       }
       case other => other

http://git-wip-us.apache.org/repos/asf/spark/blob/d637a666/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 3a3541a..84e66b5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.execution.PhysicalRDD
+
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
@@ -338,4 +340,18 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
       assert((accsSize - 2) == Accumulators.originals.size)
     }
   }
+
+  test("SPARK-10327 Cache Table is not working while subquery has alias in its project list") {
+    ctx.sparkContext.parallelize((1, 1) :: (2, 2) :: Nil)
+      .toDF("key", "value").selectExpr("key", "value", "key+1").registerTempTable("abc")
+    ctx.cacheTable("abc")
+
+    val sparkPlan = sql(
+      """select a.key, b.key, c.key from
+        |abc a join abc b on a.key=b.key
+        |join abc c on a.key=c.key""".stripMargin).queryExecution.sparkPlan
+
+    assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 3)
+    assert(sparkPlan.collect { case e: PhysicalRDD => e }.size === 0)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org