You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/09/09 01:16:53 UTC
spark git commit: [SPARK-10327] [SQL] Cache Table is not working
while subquery has alias in its project list
Repository: spark
Updated Branches:
refs/heads/master 52b24a602 -> d637a666d
[SPARK-10327] [SQL] Cache Table is not working while subquery has alias in its project list
```scala
import org.apache.spark.sql.hive.execution.HiveTableScan
sql("select key, value, key + 1 from src").registerTempTable("abc")
cacheTable("abc")
val sparkPlan = sql(
"""select a.key, b.key, c.key from
|abc a join abc b on a.key=b.key
|join abc c on a.key=c.key""".stripMargin).queryExecution.sparkPlan
assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 3) // failed
assert(sparkPlan.collect { case e: HiveTableScan => e }.size === 0) // failed
```
The actual plan is:
```
== Parsed Logical Plan ==
'Project [unresolvedalias('a.key),unresolvedalias('b.key),unresolvedalias('c.key)]
'Join Inner, Some(('a.key = 'c.key))
'Join Inner, Some(('a.key = 'b.key))
'UnresolvedRelation [abc], Some(a)
'UnresolvedRelation [abc], Some(b)
'UnresolvedRelation [abc], Some(c)
== Analyzed Logical Plan ==
key: int, key: int, key: int
Project [key#14,key#61,key#66]
Join Inner, Some((key#14 = key#66))
Join Inner, Some((key#14 = key#61))
Subquery a
Subquery abc
Project [key#14,value#15,(key#14 + 1) AS _c2#16]
MetastoreRelation default, src, None
Subquery b
Subquery abc
Project [key#61,value#62,(key#61 + 1) AS _c2#58]
MetastoreRelation default, src, None
Subquery c
Subquery abc
Project [key#66,value#67,(key#66 + 1) AS _c2#63]
MetastoreRelation default, src, None
== Optimized Logical Plan ==
Project [key#14,key#61,key#66]
Join Inner, Some((key#14 = key#66))
Project [key#14,key#61]
Join Inner, Some((key#14 = key#61))
Project [key#14]
InMemoryRelation [key#14,value#15,_c2#16], true, 10000, StorageLevel(true, true, false, true, 1), (Project [key#14,value#15,(key#14 + 1) AS _c2#16]), Some(abc)
Project [key#61]
MetastoreRelation default, src, None
Project [key#66]
MetastoreRelation default, src, None
== Physical Plan ==
TungstenProject [key#14,key#61,key#66]
BroadcastHashJoin [key#14], [key#66], BuildRight
TungstenProject [key#14,key#61]
BroadcastHashJoin [key#14], [key#61], BuildRight
ConvertToUnsafe
InMemoryColumnarTableScan [key#14], (InMemoryRelation [key#14,value#15,_c2#16], true, 10000, StorageLevel(true, true, false, true, 1), (Project [key#14,value#15,(key#14 + 1) AS _c2#16]), Some(abc))
ConvertToUnsafe
HiveTableScan [key#61], (MetastoreRelation default, src, None)
ConvertToUnsafe
HiveTableScan [key#66], (MetastoreRelation default, src, None)
```
Author: Cheng Hao <ha...@intel.com>
Closes #8494 from chenghao-intel/weird_cache.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d637a666
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d637a666
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d637a666
Branch: refs/heads/master
Commit: d637a666d5932002c8ce0bd23c06064fbfdc1c97
Parents: 52b24a6
Author: Cheng Hao <ha...@intel.com>
Authored: Tue Sep 8 16:16:50 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Sep 8 16:16:50 2015 -0700
----------------------------------------------------------------------
.../sql/catalyst/plans/logical/LogicalPlan.scala | 15 ++++++++++++---
.../org/apache/spark/sql/CachedTableSuite.scala | 16 ++++++++++++++++
2 files changed, 28 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d637a666/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 9bb466a..8f8747e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -135,16 +135,25 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
/** Args that have cleaned such that differences in expression id should not affect equality */
protected lazy val cleanArgs: Seq[Any] = {
val input = children.flatMap(_.output)
+ def cleanExpression(e: Expression) = e match {
+ case a: Alias =>
+ // As the root of the expression, Alias will always take an arbitrary exprId, we need
+ // to erase that for equality testing.
+ val cleanedExprId = Alias(a.child, a.name)(ExprId(-1), a.qualifiers)
+ BindReferences.bindReference(cleanedExprId, input, allowFailures = true)
+ case other => BindReferences.bindReference(other, input, allowFailures = true)
+ }
+
productIterator.map {
// Children are checked using sameResult above.
case tn: TreeNode[_] if containsChild(tn) => null
- case e: Expression => BindReferences.bindReference(e, input, allowFailures = true)
+ case e: Expression => cleanExpression(e)
case s: Option[_] => s.map {
- case e: Expression => BindReferences.bindReference(e, input, allowFailures = true)
+ case e: Expression => cleanExpression(e)
case other => other
}
case s: Seq[_] => s.map {
- case e: Expression => BindReferences.bindReference(e, input, allowFailures = true)
+ case e: Expression => cleanExpression(e)
case other => other
}
case other => other
http://git-wip-us.apache.org/repos/asf/spark/blob/d637a666/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 3a3541a..84e66b5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql
+import org.apache.spark.sql.execution.PhysicalRDD
+
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -338,4 +340,18 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
assert((accsSize - 2) == Accumulators.originals.size)
}
}
+
+ test("SPARK-10327 Cache Table is not working while subquery has alias in its project list") {
+ ctx.sparkContext.parallelize((1, 1) :: (2, 2) :: Nil)
+ .toDF("key", "value").selectExpr("key", "value", "key+1").registerTempTable("abc")
+ ctx.cacheTable("abc")
+
+ val sparkPlan = sql(
+ """select a.key, b.key, c.key from
+ |abc a join abc b on a.key=b.key
+ |join abc c on a.key=c.key""".stripMargin).queryExecution.sparkPlan
+
+ assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 3)
+ assert(sparkPlan.collect { case e: PhysicalRDD => e }.size === 0)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org