You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/21 21:27:04 UTC

spark git commit: [SPARK-13306][SQL] Addendum to uncorrelated scalar subquery

Repository: spark
Updated Branches:
  refs/heads/master 0947f0989 -> af441ddbd


[SPARK-13306][SQL] Addendum to uncorrelated scalar subquery

## What changes were proposed in this pull request?
This pull request fixes some minor issues (documentation, test flakiness, test organization) with #11190, which was merged earlier tonight.

## How was the this patch tested?
unit tests.

Author: Reynold Xin <rx...@databricks.com>

Closes #11285 from rxin/subquery.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af441ddb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af441ddb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af441ddb

Branch: refs/heads/master
Commit: af441ddbd13f48c09b458c451d7bba3965a878d1
Parents: 0947f09
Author: Reynold Xin <rx...@databricks.com>
Authored: Sun Feb 21 12:27:02 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sun Feb 21 12:27:02 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  3 +-
 .../sql/catalyst/expressions/literals.scala     | 11 ----
 .../sql/catalyst/expressions/subquery.scala     |  5 +-
 .../apache/spark/sql/execution/SparkPlan.scala  | 49 +++++++++-------
 .../apache/spark/sql/execution/subquery.scala   |  2 +-
 .../org/apache/spark/sql/SubquerySuite.scala    | 61 ++++++++++----------
 6 files changed, 61 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/af441ddb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 562711a..23e4709 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -123,13 +123,12 @@ class Analyzer(
           }
           substituted.getOrElse(u)
         case other =>
-          // This can't be done in ResolveSubquery because that does not know the CTE.
+          // This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE.
           other transformExpressions {
             case e: SubqueryExpression =>
               e.withNewPlan(substituteCTE(e.query, cteRelations))
           }
       }
-
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/af441ddb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index d7d768b..37bfe98 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -255,14 +255,3 @@ case class Literal protected (value: Any, dataType: DataType)
     case _ => value.toString
   }
 }
-
-// TODO: Specialize
-case class MutableLiteral(var value: Any, dataType: DataType, nullable: Boolean = true)
-  extends LeafExpression with CodegenFallback {
-
-  def update(expression: Expression, input: InternalRow): Unit = {
-    value = expression.eval(input)
-  }
-
-  override def eval(input: InternalRow): Any = value
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/af441ddb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
index d0c44b0..ddf214a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
@@ -45,9 +45,8 @@ abstract class SubqueryExpression extends LeafExpression {
 }
 
 /**
- * A subquery that will return only one row and one column.
- *
- * This will be converted into [[execution.ScalarSubquery]] during physical planning.
+ * A subquery that will return only one row and one column. This will be converted into a physical
+ * scalar subquery during planning.
  *
  * Note: `exprId` is used to have unique name in explain string output.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/af441ddb/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 872ccde..477a946 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -46,7 +46,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
    * populated by the query planning infrastructure.
    */
   @transient
-  protected[spark] final val sqlContext = SQLContext.getActive().getOrElse(null)
+  protected[spark] final val sqlContext = SQLContext.getActive().orNull
 
   protected def sparkContext = sqlContext.sparkContext
 
@@ -120,44 +120,49 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
     }
   }
 
-  // All the subqueries and their Future of results.
-  @transient private val queryResults = ArrayBuffer[(ScalarSubquery, Future[Array[InternalRow]])]()
+  /**
+   * List of (uncorrelated scalar subquery, future holding the subquery result) for this plan node.
+   * This list is populated by [[prepareSubqueries]], which is called in [[prepare]].
+   */
+  @transient
+  private val subqueryResults = new ArrayBuffer[(ScalarSubquery, Future[Array[InternalRow]])]
 
   /**
-   * Collects all the subqueries and create a Future to take the first two rows of them.
+   * Finds scalar subquery expressions in this plan node and starts evaluating them.
+   * The list of subqueries are added to [[subqueryResults]].
    */
   protected def prepareSubqueries(): Unit = {
     val allSubqueries = expressions.flatMap(_.collect {case e: ScalarSubquery => e})
     allSubqueries.asInstanceOf[Seq[ScalarSubquery]].foreach { e =>
       val futureResult = Future {
-        // We only need the first row, try to take two rows so we can throw an exception if there
-        // are more than one rows returned.
+        // Each subquery should return only one row (and one column). We take two here and throws
+        // an exception later if the number of rows is greater than one.
         e.executedPlan.executeTake(2)
       }(SparkPlan.subqueryExecutionContext)
-      queryResults += e -> futureResult
+      subqueryResults += e -> futureResult
     }
   }
 
   /**
-   * Waits for all the subqueries to finish and updates the results.
+   * Blocks the thread until all subqueries finish evaluation and update the results.
    */
   protected def waitForSubqueries(): Unit = {
     // fill in the result of subqueries
-    queryResults.foreach {
-      case (e, futureResult) =>
-        val rows = Await.result(futureResult, Duration.Inf)
-        if (rows.length > 1) {
-          sys.error(s"more than one row returned by a subquery used as an expression:\n${e.plan}")
-        }
-        if (rows.length == 1) {
-          assert(rows(0).numFields == 1, "Analyzer should make sure this only returns one column")
-          e.updateResult(rows(0).get(0, e.dataType))
-        } else {
-          // There is no rows returned, the result should be null.
-          e.updateResult(null)
-        }
+    subqueryResults.foreach { case (e, futureResult) =>
+      val rows = Await.result(futureResult, Duration.Inf)
+      if (rows.length > 1) {
+        sys.error(s"more than one row returned by a subquery used as an expression:\n${e.plan}")
+      }
+      if (rows.length == 1) {
+        assert(rows(0).numFields == 1,
+          s"Expects 1 field, but got ${rows(0).numFields}; something went wrong in analysis")
+        e.updateResult(rows(0).get(0, e.dataType))
+      } else {
+        // If there is no rows returned, the result should be null.
+        e.updateResult(null)
+      }
     }
-    queryResults.clear()
+    subqueryResults.clear()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/af441ddb/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index 9c645c7..e6d7480 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -62,7 +62,7 @@ case class ScalarSubquery(
 /**
  * Convert the subquery from logical plan into executed plan.
  */
-private[sql] case class PlanSubqueries(sqlContext: SQLContext) extends Rule[SparkPlan] {
+case class PlanSubqueries(sqlContext: SQLContext) extends Rule[SparkPlan] {
   def apply(plan: SparkPlan): SparkPlan = {
     plan.transformAllExpressions {
       case subquery: expressions.ScalarSubquery =>

http://git-wip-us.apache.org/repos/asf/spark/blob/af441ddb/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index e851eb0..21b19fe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -20,65 +20,64 @@ package org.apache.spark.sql
 import org.apache.spark.sql.test.SharedSQLContext
 
 class SubquerySuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
 
   test("simple uncorrelated scalar subquery") {
     assertResult(Array(Row(1))) {
       sql("select (select 1 as b) as b").collect()
     }
 
-    assertResult(Array(Row(1))) {
-      sql("with t2 as (select 1 as b, 2 as c) " +
-        "select a from (select 1 as a union all select 2 as a) t " +
-        "where a = (select max(b) from t2) ").collect()
-    }
-
     assertResult(Array(Row(3))) {
       sql("select (select (select 1) + 1) + 1").collect()
     }
 
-    // more than one columns
-    val error = intercept[AnalysisException] {
-      sql("select (select 1, 2) as b").collect()
-    }
-    assert(error.message contains "Scalar subquery must return only one column, but got 2")
-
-    // more than one rows
-    val error2 = intercept[RuntimeException] {
-      sql("select (select a from (select 1 as a union all select 2 as a) t) as b").collect()
-    }
-    assert(error2.getMessage contains
-      "more than one row returned by a subquery used as an expression")
-
     // string type
     assertResult(Array(Row("s"))) {
       sql("select (select 's' as s) as b").collect()
     }
+  }
 
-    // zero rows
+  test("uncorrelated scalar subquery in CTE") {
+    assertResult(Array(Row(1))) {
+      sql("with t2 as (select 1 as b, 2 as c) " +
+        "select a from (select 1 as a union all select 2 as a) t " +
+        "where a = (select max(b) from t2) ").collect()
+    }
+  }
+
+  test("uncorrelated scalar subquery should return null if there is 0 rows") {
     assertResult(Array(Row(null))) {
       sql("select (select 's' as s limit 0) as b").collect()
     }
   }
 
-  test("uncorrelated scalar subquery on testData") {
-    // initialize test Data
-    testData
+  test("runtime error when the number of rows is greater than 1") {
+    val error2 = intercept[RuntimeException] {
+      sql("select (select a from (select 1 as a union all select 2 as a) t) as b").collect()
+    }
+    assert(error2.getMessage.contains(
+      "more than one row returned by a subquery used as an expression"))
+  }
+
+  test("uncorrelated scalar subquery on a DataFrame generated query") {
+    val df = Seq((1, "one"), (2, "two"), (3, "three")).toDF("key", "value")
+    df.registerTempTable("subqueryData")
 
-    assertResult(Array(Row(5))) {
-      sql("select (select key from testData where key > 3 limit 1) + 1").collect()
+    assertResult(Array(Row(4))) {
+      sql("select (select key from subqueryData where key > 2 order by key limit 1) + 1").collect()
     }
 
-    assertResult(Array(Row(-100))) {
-      sql("select -(select max(key) from testData)").collect()
+    assertResult(Array(Row(-3))) {
+      sql("select -(select max(key) from subqueryData)").collect()
     }
 
     assertResult(Array(Row(null))) {
-      sql("select (select value from testData limit 0)").collect()
+      sql("select (select value from subqueryData limit 0)").collect()
     }
 
-    assertResult(Array(Row("99"))) {
-      sql("select (select min(value) from testData" +
-        " where key = (select max(key) from testData) - 1)").collect()
+    assertResult(Array(Row("two"))) {
+      sql("select (select min(value) from subqueryData" +
+        " where key = (select max(key) from subqueryData) - 1)").collect()
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org