You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/01/08 22:09:13 UTC

spark git commit: [SPARK-19093][SQL] Cached tables are not used in SubqueryExpression

Repository: spark
Updated Branches:
  refs/heads/master cd1d00ada -> 4351e6220


[SPARK-19093][SQL] Cached tables are not used in SubqueryExpression

## What changes were proposed in this pull request?
Consider the plans inside subquery expressions while looking up cache manager to make
use of cached data. Currently CacheManager.useCachedData does not consider the
subquery expressions in the plan.

SQL
```
select * from rows where not exists (select * from rows)
```
Before the fix
```
== Optimized Logical Plan ==
Join LeftAnti
:- InMemoryRelation [_1#3775, _2#3776], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
:     +- *FileScan parquet [_1#3775,_2#3776] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/rows], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:string,_2:string>
+- Project [_1#3775 AS _1#3775#4001, _2#3776 AS _2#3776#4002]
   +- Relation[_1#3775,_2#3776] parquet
```

After
```
== Optimized Logical Plan ==
Join LeftAnti
:- InMemoryRelation [_1#256, _2#257], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
:     +- *FileScan parquet [_1#256,_2#257] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/rows], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:string,_2:string>
+- Project [_1#256 AS _1#256#298, _2#257 AS _2#257#299]
   +- InMemoryRelation [_1#256, _2#257], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *FileScan parquet [_1#256,_2#257] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/rows], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:string,_2:string>
```

Query2
```
 SELECT * FROM t1
 WHERE
 c1 IN (SELECT c1 FROM t2 WHERE c1 IN (SELECT c1 FROM t3 WHERE c1 = 1))
```
Before
```
== Analyzed Logical Plan ==
c1: int
Project [c1#3]
+- Filter predicate-subquery#47 [(c1#3 = c1#10)]
   :  +- Project [c1#10]
   :     +- Filter predicate-subquery#46 [(c1#10 = c1#17)]
   :        :  +- Project [c1#17]
   :        :     +- Filter (c1#17 = 1)
   :        :        +- SubqueryAlias t3, `t3`
   :        :           +- Project [value#15 AS c1#17]
   :        :              +- LocalRelation [value#15]
   :        +- SubqueryAlias t2, `t2`
   :           +- Project [value#8 AS c1#10]
   :              +- LocalRelation [value#8]
   +- SubqueryAlias t1, `t1`
      +- Project [value#1 AS c1#3]
         +- LocalRelation [value#1]

== Optimized Logical Plan ==
Join LeftSemi, (c1#3 = c1#10)
:- InMemoryRelation [c1#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), t1
:     +- LocalTableScan [c1#3]
+- Project [value#8 AS c1#10]
   +- Join LeftSemi, (value#8 = c1#17)
      :- LocalRelation [value#8]
      +- Project [value#15 AS c1#17]
         +- Filter (value#15 = 1)
            +- LocalRelation [value#15]

```
After
```
== Analyzed Logical Plan ==
c1: int
Project [c1#3]
+- Filter predicate-subquery#47 [(c1#3 = c1#10)]
   :  +- Project [c1#10]
   :     +- Filter predicate-subquery#46 [(c1#10 = c1#17)]
   :        :  +- Project [c1#17]
   :        :     +- Filter (c1#17 = 1)
   :        :        +- SubqueryAlias t3, `t3`
   :        :           +- Project [value#15 AS c1#17]
   :        :              +- LocalRelation [value#15]
   :        +- SubqueryAlias t2, `t2`
   :           +- Project [value#8 AS c1#10]
   :              +- LocalRelation [value#8]
   +- SubqueryAlias t1, `t1`
      +- Project [value#1 AS c1#3]
         +- LocalRelation [value#1]

== Optimized Logical Plan ==
Join LeftSemi, (c1#3 = c1#10)
:- InMemoryRelation [c1#3], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), t1
:     +- LocalTableScan [c1#3]
+- Join LeftSemi, (c1#10 = c1#17)
   :- InMemoryRelation [c1#10], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), t2
   :     +- LocalTableScan [c1#10]
   +- Filter (c1#17 = 1)
      +- InMemoryRelation [c1#17], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), t1
            +- LocalTableScan [c1#3]
```
## How was this patch tested?
Added new tests in CachedTableSuite.

Author: Dilip Biswal <db...@us.ibm.com>

Closes #16493 from dilipbiswal/SPARK-19093.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4351e622
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4351e622
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4351e622

Branch: refs/heads/master
Commit: 4351e62207957bec663108a571cff2bfaaa9e7d5
Parents: cd1d00a
Author: Dilip Biswal <db...@us.ibm.com>
Authored: Sun Jan 8 23:09:07 2017 +0100
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Sun Jan 8 23:09:07 2017 +0100

----------------------------------------------------------------------
 .../spark/sql/execution/CacheManager.scala      |  7 +-
 .../org/apache/spark/sql/CachedTableSuite.scala | 72 ++++++++++++++++++++
 2 files changed, 78 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4351e622/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 526623a..4ca1347 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.Dataset
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
@@ -131,12 +132,16 @@ class CacheManager extends Logging {
 
   /** Replaces segments of the given logical plan with cached versions where possible. */
   def useCachedData(plan: LogicalPlan): LogicalPlan = {
-    plan transformDown {
+    val newPlan = plan transformDown {
       case currentFragment =>
         lookupCachedData(currentFragment)
           .map(_.cachedRepresentation.withOutput(currentFragment.output))
           .getOrElse(currentFragment)
     }
+
+    newPlan transformAllExpressions {
+      case s: SubqueryExpression => s.withNewPlan(useCachedData(s.plan))
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/4351e622/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index f42402e..fb4812a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -24,6 +24,8 @@ import scala.language.postfixOps
 import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark.CleanerListener
+import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.RDDScanExec
 import org.apache.spark.sql.execution.columnar._
 import org.apache.spark.sql.execution.exchange.ShuffleExchange
@@ -37,6 +39,14 @@ private case class BigData(s: String)
 class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext {
   import testImplicits._
 
+  override def afterEach(): Unit = {
+    try {
+      spark.catalog.clearCache()
+    } finally {
+      super.afterEach()
+    }
+  }
+
   def rddIdOf(tableName: String): Int = {
     val plan = spark.table(tableName).queryExecution.sparkPlan
     plan.collect {
@@ -53,6 +63,16 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
     maybeBlock.nonEmpty
   }
 
+  private def getNumInMemoryRelations(plan: LogicalPlan): Int = {
+    var sum = plan.collect { case _: InMemoryRelation => 1 }.sum
+    plan.transformAllExpressions {
+      case e: SubqueryExpression =>
+        sum += getNumInMemoryRelations(e.plan)
+        e
+    }
+    sum
+  }
+
   test("withColumn doesn't invalidate cached dataframe") {
     var evalCount = 0
     val myUDF = udf((x: String) => { evalCount += 1; "result" })
@@ -565,4 +585,56 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
         case i: InMemoryRelation => i
       }.size == 1)
   }
+
+  test("SPARK-19093 Caching in side subquery") {
+    withTempView("t1") {
+      Seq(1).toDF("c1").createOrReplaceTempView("t1")
+      spark.catalog.cacheTable("t1")
+      val cachedPlan =
+        sql(
+          """
+            |SELECT * FROM t1
+            |WHERE
+            |NOT EXISTS (SELECT * FROM t1)
+          """.stripMargin).queryExecution.optimizedPlan
+      assert(getNumInMemoryRelations(cachedPlan) == 2)
+    }
+  }
+
+  test("SPARK-19093 scalar and nested predicate query") {
+    withTempView("t1", "t2", "t3", "t4") {
+      Seq(1).toDF("c1").createOrReplaceTempView("t1")
+      Seq(2).toDF("c1").createOrReplaceTempView("t2")
+      Seq(1).toDF("c1").createOrReplaceTempView("t3")
+      Seq(1).toDF("c1").createOrReplaceTempView("t4")
+      spark.catalog.cacheTable("t1")
+      spark.catalog.cacheTable("t2")
+      spark.catalog.cacheTable("t3")
+      spark.catalog.cacheTable("t4")
+
+      // Nested predicate subquery
+      val cachedPlan =
+        sql(
+        """
+          |SELECT * FROM t1
+          |WHERE
+          |c1 IN (SELECT c1 FROM t2 WHERE c1 IN (SELECT c1 FROM t3 WHERE c1 = 1))
+        """.stripMargin).queryExecution.optimizedPlan
+      assert(getNumInMemoryRelations(cachedPlan) == 3)
+
+      // Scalar subquery and predicate subquery
+      val cachedPlan2 =
+        sql(
+          """
+            |SELECT * FROM (SELECT max(c1) FROM t1 GROUP BY c1)
+            |WHERE
+            |c1 = (SELECT max(c1) FROM t2 GROUP BY c1)
+            |OR
+            |EXISTS (SELECT c1 FROM t3)
+            |OR
+            |c1 IN (SELECT c1 FROM t4)
+          """.stripMargin).queryExecution.optimizedPlan
+      assert(getNumInMemoryRelations(cachedPlan2) == 4)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org