You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/02/21 06:02:01 UTC

spark git commit: [SPARK-13306] [SQL] uncorrelated scalar subquery

Repository: spark
Updated Branches:
  refs/heads/master f88c641bc -> 792507128


[SPARK-13306] [SQL] uncorrelated scalar subquery

A scalar subquery is a subquery that only generate single row and single column, could be used as part of expression. Uncorrelated scalar subquery means it does not has a reference to external table.

All the uncorrelated scalar subqueries will be executed during prepare() of SparkPlan.

The plans for query
```sql
select 1 + (select 2 + (select 3))
```
looks like this
```
== Parsed Logical Plan ==
'Project [unresolvedalias((1 + subquery#1),None)]
:- OneRowRelation$
+- 'Subquery subquery#1
   +- 'Project [unresolvedalias((2 + subquery#0),None)]
      :- OneRowRelation$
      +- 'Subquery subquery#0
         +- 'Project [unresolvedalias(3,None)]
            +- OneRowRelation$

== Analyzed Logical Plan ==
_c0: int
Project [(1 + subquery#1) AS _c0#4]
:- OneRowRelation$
+- Subquery subquery#1
   +- Project [(2 + subquery#0) AS _c0#3]
      :- OneRowRelation$
      +- Subquery subquery#0
         +- Project [3 AS _c0#2]
            +- OneRowRelation$

== Optimized Logical Plan ==
Project [(1 + subquery#1) AS _c0#4]
:- OneRowRelation$
+- Subquery subquery#1
   +- Project [(2 + subquery#0) AS _c0#3]
      :- OneRowRelation$
      +- Subquery subquery#0
         +- Project [3 AS _c0#2]
            +- OneRowRelation$

== Physical Plan ==
WholeStageCodegen
:  +- Project [(1 + subquery#1) AS _c0#4]
:     :- INPUT
:     +- Subquery subquery#1
:        +- WholeStageCodegen
:           :  +- Project [(2 + subquery#0) AS _c0#3]
:           :     :- INPUT
:           :     +- Subquery subquery#0
:           :        +- WholeStageCodegen
:           :           :  +- Project [3 AS _c0#2]
:           :           :     +- INPUT
:           :           +- Scan OneRowRelation[]
:           +- Scan OneRowRelation[]
+- Scan OneRowRelation[]
```

Author: Davies Liu <da...@databricks.com>

Closes #11190 from davies/scalar_subquery.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79250712
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79250712
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79250712

Branch: refs/heads/master
Commit: 7925071280bfa1570435bde3e93492eaf2167d56
Parents: f88c641
Author: Davies Liu <da...@databricks.com>
Authored: Sat Feb 20 21:01:51 2016 -0800
Committer: Davies Liu <da...@gmail.com>
Committed: Sat Feb 20 21:01:51 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/parser/ExpressionParser.g      |  2 +
 .../apache/spark/sql/catalyst/CatalystQl.scala  |  2 +
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 32 ++++++++
 .../sql/catalyst/expressions/subquery.scala     | 82 +++++++++++++++++++
 .../sql/catalyst/optimizer/Optimizer.scala      | 14 +++-
 .../spark/sql/catalyst/plans/QueryPlan.scala    |  6 ++
 .../spark/sql/catalyst/trees/TreeNode.scala     | 11 ++-
 .../spark/sql/catalyst/CatalystQlSuite.scala    |  7 ++
 .../catalyst/analysis/AnalysisErrorSuite.scala  | 11 +++
 .../scala/org/apache/spark/sql/SQLContext.scala |  1 +
 .../apache/spark/sql/execution/SparkPlan.scala  | 50 ++++++++++++
 .../spark/sql/execution/WholeStageCodegen.scala |  5 +-
 .../spark/sql/execution/basicOperators.scala    | 15 ++++
 .../apache/spark/sql/execution/subquery.scala   | 74 +++++++++++++++++
 .../org/apache/spark/sql/SQLQuerySuite.scala    |  7 +-
 .../org/apache/spark/sql/SubquerySuite.scala    | 84 ++++++++++++++++++++
 16 files changed, 391 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g
index c162c1a..10f2e24 100644
--- a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g
+++ b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g
@@ -205,6 +205,8 @@ atomExpression
     | whenExpression
     | (functionName LPAREN) => function
     | tableOrColumn
+    | (LPAREN KW_SELECT) => subQueryExpression
+      -> ^(TOK_SUBQUERY_EXPR ^(TOK_SUBQUERY_OP) subQueryExpression)
     | LPAREN! expression RPAREN!
     ;
 

http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala
index 8099751..b16025a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala
@@ -667,6 +667,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
           UnresolvedAttribute(nameParts :+ cleanIdentifier(attr))
         case other => UnresolvedExtractValue(other, Literal(cleanIdentifier(attr)))
       }
+    case Token("TOK_SUBQUERY_EXPR", Token("TOK_SUBQUERY_OP", Nil) :: subquery :: Nil) =>
+      ScalarSubquery(nodeToPlan(subquery))
 
     /* Stars (*) */
     case Token("TOK_ALLCOLREF", Nil) => UnresolvedStar(None)

http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 54b91ad..ba306f8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -80,6 +80,7 @@ class Analyzer(
       ResolveGenerate ::
       ResolveFunctions ::
       ResolveAliases ::
+      ResolveSubquery ::
       ResolveWindowOrder ::
       ResolveWindowFrame ::
       ResolveNaturalJoin ::
@@ -120,7 +121,14 @@ class Analyzer(
             withAlias.getOrElse(relation)
           }
           substituted.getOrElse(u)
+        case other =>
+          // This can't be done in ResolveSubquery because that does not know the CTE.
+          other transformExpressions {
+            case e: SubqueryExpression =>
+              e.withNewPlan(substituteCTE(e.query, cteRelations))
+          }
       }
+
     }
   }
 
@@ -717,6 +725,30 @@ class Analyzer(
   }
 
   /**
+   * This rule resolve subqueries inside expressions.
+   *
+   * Note: CTE are handled in CTESubstitution.
+   */
+  object ResolveSubquery extends Rule[LogicalPlan] with PredicateHelper {
+
+    private def hasSubquery(e: Expression): Boolean = {
+      e.find(_.isInstanceOf[SubqueryExpression]).isDefined
+    }
+
+    private def hasSubquery(q: LogicalPlan): Boolean = {
+      q.expressions.exists(hasSubquery)
+    }
+
+    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+      case q: LogicalPlan if q.childrenResolved && hasSubquery(q) =>
+        q transformExpressions {
+          case e: SubqueryExpression if !e.query.resolved =>
+            e.withNewPlan(execute(e.query))
+        }
+    }
+  }
+
+  /**
    * Turns projections that contain aggregate expressions into aggregations.
    */
   object GlobalAggregates extends Rule[LogicalPlan] {

http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
new file mode 100644
index 0000000..a8f5e1f
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
+import org.apache.spark.sql.types.DataType
+
+/**
+ * An interface for subquery that is used in expressions.
+ */
+abstract class SubqueryExpression extends LeafExpression {
+
+  /**
+   * The logical plan of the query.
+   */
+  def query: LogicalPlan
+
+  /**
+   * Either a logical plan or a physical plan. The generated tree string (explain output) uses this
+   * field to explain the subquery.
+   */
+  def plan: QueryPlan[_]
+
+  /**
+   * Updates the query with new logical plan.
+   */
+  def withNewPlan(plan: LogicalPlan): SubqueryExpression
+}
+
+/**
+ * A subquery that will return only one row and one column.
+ *
+ * This will be converted into [[execution.ScalarSubquery]] during physical planning.
+ *
+ * Note: `exprId` is used to have unique name in explain string output.
+ */
+case class ScalarSubquery(
+    query: LogicalPlan,
+    exprId: ExprId = NamedExpression.newExprId)
+  extends SubqueryExpression with Unevaluable {
+
+  override def plan: LogicalPlan = Subquery(toString, query)
+
+  override lazy val resolved: Boolean = query.resolved
+
+  override def dataType: DataType = query.schema.fields.head.dataType
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    if (query.schema.length != 1) {
+      TypeCheckResult.TypeCheckFailure("Scalar subquery must return only one column, but got " +
+        query.schema.length.toString)
+    } else {
+      TypeCheckResult.TypeCheckSuccess
+    }
+  }
+
+  override def foldable: Boolean = false
+  override def nullable: Boolean = true
+
+  override def withNewPlan(plan: LogicalPlan): ScalarSubquery = ScalarSubquery(plan, exprId)
+
+  override def toString: String = s"subquery#${exprId.id}"
+
+  // TODO: support sql()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index b7d8d93..202f6b5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -90,7 +90,19 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
     Batch("Decimal Optimizations", FixedPoint(100),
       DecimalAggregates) ::
     Batch("LocalRelation", FixedPoint(100),
-      ConvertToLocalRelation) :: Nil
+      ConvertToLocalRelation) ::
+    Batch("Subquery", Once,
+      OptimizeSubqueries) :: Nil
+  }
+
+  /**
+   * Optimize all the subqueries inside expression.
+   */
+  object OptimizeSubqueries extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+      case subquery: SubqueryExpression =>
+        subquery.withNewPlan(Optimizer.this.execute(subquery.query))
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 05f5bdb..86bd33f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.plans
 
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.Subquery
 import org.apache.spark.sql.catalyst.trees.TreeNode
 import org.apache.spark.sql.types.{DataType, StructType}
 
@@ -226,4 +227,9 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
   protected def statePrefix = if (missingInput.nonEmpty && children.nonEmpty) "!" else ""
 
   override def simpleString: String = statePrefix + super.simpleString
+
+  override def treeChildren: Seq[PlanType] = {
+    val subqueries = expressions.flatMap(_.collect {case e: SubqueryExpression => e})
+    children ++ subqueries.map(e => e.plan.asInstanceOf[PlanType])
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 30df2a8..e46ce1c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -449,6 +449,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
   }
 
   /**
+   * All the nodes that will be used to generate tree string.
+   */
+  protected def treeChildren: Seq[BaseType] = children
+
+  /**
    * Appends the string represent of this node and its children to the given StringBuilder.
    *
    * The `i`-th element in `lastChildren` indicates whether the ancestor of the current node at
@@ -470,9 +475,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
     builder.append(simpleString)
     builder.append("\n")
 
-    if (children.nonEmpty) {
-      children.init.foreach(_.generateTreeString(depth + 1, lastChildren :+ false, builder))
-      children.last.generateTreeString(depth + 1, lastChildren :+ true, builder)
+    if (treeChildren.nonEmpty) {
+      treeChildren.init.foreach(_.generateTreeString(depth + 1, lastChildren :+ false, builder))
+      treeChildren.last.generateTreeString(depth + 1, lastChildren :+ true, builder)
     }
 
     builder

http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala
index 8d7d6b5..ed71218 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types.BooleanType
 import org.apache.spark.unsafe.types.CalendarInterval
 
 class CatalystQlSuite extends PlanTest {
@@ -201,4 +202,10 @@ class CatalystQlSuite extends PlanTest {
     parser.parsePlan("select sum(product + 1) over (partition by (product + (1)) order by 2) " +
       "from windowData")
   }
+
+  test("subquery") {
+    parser.parsePlan("select (select max(b) from s) ss from t")
+    parser.parsePlan("select * from t where a = (select b from s)")
+    parser.parsePlan("select * from t group by g having a > (select b from s)")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index a46006a..69f78a0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -114,6 +114,17 @@ class AnalysisErrorSuite extends AnalysisTest {
   val dateLit = Literal.create(null, DateType)
 
   errorTest(
+    "scalar subquery with 2 columns",
+     testRelation.select(
+       (ScalarSubquery(testRelation.select('a, dateLit.as('b))) + Literal(1)).as('a)),
+     "Scalar subquery must return only one column, but got 2" :: Nil)
+
+  errorTest(
+    "scalar subquery with no column",
+    testRelation.select(ScalarSubquery(LocalRelation()).as('a)),
+    "Scalar subquery must return only one column, but got 0" :: Nil)
+
+  errorTest(
     "single invalid type, single arg",
     testRelation.select(TestFunction(dateLit :: Nil, IntegerType :: Nil).as('a)),
     "cannot resolve" :: "testfunction" :: "argument 1" :: "requires int type" ::

http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index c7d1096..932df36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -884,6 +884,7 @@ class SQLContext private[sql](
   @transient
   protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
     val batches = Seq(
+      Batch("Subquery", Once, PlanSubqueries(self)),
       Batch("Add exchange", Once, EnsureRequirements(self)),
       Batch("Whole stage codegen", Once, CollapseCodegenStages(self))
     )

http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index c72b8dc..872ccde 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.execution
 import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.duration._
 
 import org.apache.spark.Logging
 import org.apache.spark.rdd.{RDD, RDDOperationScope}
@@ -31,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetric}
 import org.apache.spark.sql.types.DataType
+import org.apache.spark.util.ThreadUtils
 
 /**
  * The base class for physical operators.
@@ -112,16 +115,58 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   final def execute(): RDD[InternalRow] = {
     RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
       prepare()
+      waitForSubqueries()
       doExecute()
     }
   }
 
+  // All the subqueries and their Future of results.
+  @transient private val queryResults = ArrayBuffer[(ScalarSubquery, Future[Array[InternalRow]])]()
+
+  /**
+   * Collects all the subqueries and create a Future to take the first two rows of them.
+   */
+  protected def prepareSubqueries(): Unit = {
+    val allSubqueries = expressions.flatMap(_.collect {case e: ScalarSubquery => e})
+    allSubqueries.asInstanceOf[Seq[ScalarSubquery]].foreach { e =>
+      val futureResult = Future {
+        // We only need the first row, try to take two rows so we can throw an exception if there
+        // are more than one rows returned.
+        e.executedPlan.executeTake(2)
+      }(SparkPlan.subqueryExecutionContext)
+      queryResults += e -> futureResult
+    }
+  }
+
+  /**
+   * Waits for all the subqueries to finish and updates the results.
+   */
+  protected def waitForSubqueries(): Unit = {
+    // fill in the result of subqueries
+    queryResults.foreach {
+      case (e, futureResult) =>
+        val rows = Await.result(futureResult, Duration.Inf)
+        if (rows.length > 1) {
+          sys.error(s"more than one row returned by a subquery used as an expression:\n${e.plan}")
+        }
+        if (rows.length == 1) {
+          assert(rows(0).numFields == 1, "Analyzer should make sure this only returns one column")
+          e.updateResult(rows(0).get(0, e.dataType))
+        } else {
+          // There is no rows returned, the result should be null.
+          e.updateResult(null)
+        }
+    }
+    queryResults.clear()
+  }
+
   /**
    * Prepare a SparkPlan for execution. It's idempotent.
    */
   final def prepare(): Unit = {
     if (prepareCalled.compareAndSet(false, true)) {
       doPrepare()
+      prepareSubqueries()
       children.foreach(_.prepare())
     }
   }
@@ -231,6 +276,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   }
 }
 
+object SparkPlan {
+  private[execution] val subqueryExecutionContext = ExecutionContext.fromExecutorService(
+    ThreadUtils.newDaemonCachedThreadPool("subquery", 16))
+}
+
 private[sql] trait LeafNode extends SparkPlan {
   override def children: Seq[SparkPlan] = Nil
   override def producedAttributes: AttributeSet = outputSet

http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 8626f54..ad8564e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -73,9 +73,10 @@ trait CodegenSupport extends SparkPlan {
   /**
     * Returns Java source code to process the rows from upstream.
     */
-  def produce(ctx: CodegenContext, parent: CodegenSupport): String = {
+  final def produce(ctx: CodegenContext, parent: CodegenSupport): String = {
     this.parent = parent
     ctx.freshNamePrefix = variablePrefix
+    waitForSubqueries()
     doProduce(ctx)
   }
 
@@ -101,7 +102,7 @@ trait CodegenSupport extends SparkPlan {
   /**
     * Consume the columns generated from current SparkPlan, call it's parent.
     */
-  def consume(ctx: CodegenContext, input: Seq[ExprCode], row: String = null): String = {
+  final def consume(ctx: CodegenContext, input: Seq[ExprCode], row: String = null): String = {
     if (input != null) {
       assert(input.length == output.length)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 4b82d55..55bddd1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -343,3 +343,18 @@ case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPl
 
   protected override def doExecute(): RDD[InternalRow] = child.execute()
 }
+
+/**
+ * A plan as subquery.
+ *
+ * This is used to generate tree string for SparkScalarSubquery.
+ */
+case class Subquery(name: String, child: SparkPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    throw new UnsupportedOperationException
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
new file mode 100644
index 0000000..9c645c7
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.{expressions, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.{ExprId, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.types.DataType
+
+/**
+ * A subquery that will return only one row and one column.
+ *
+ * This is the physical copy of ScalarSubquery to be used inside SparkPlan.
+ */
+case class ScalarSubquery(
+    @transient executedPlan: SparkPlan,
+    exprId: ExprId)
+  extends SubqueryExpression {
+
+  override def query: LogicalPlan = throw new UnsupportedOperationException
+  override def withNewPlan(plan: LogicalPlan): SubqueryExpression = {
+    throw new UnsupportedOperationException
+  }
+  override def plan: SparkPlan = Subquery(simpleString, executedPlan)
+
+  override def dataType: DataType = executedPlan.schema.fields.head.dataType
+  override def nullable: Boolean = true
+  override def toString: String = s"subquery#${exprId.id}"
+
+  // the first column in first row from `query`.
+  private var result: Any = null
+
+  def updateResult(v: Any): Unit = {
+    result = v
+  }
+
+  override def eval(input: InternalRow): Any = result
+
+  override def genCode(ctx: CodegenContext, ev: ExprCode): String = {
+    Literal.create(result, dataType).genCode(ctx, ev)
+  }
+}
+
+/**
+ * Convert the subquery from logical plan into executed plan.
+ */
+private[sql] case class PlanSubqueries(sqlContext: SQLContext) extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    plan.transformAllExpressions {
+      case subquery: expressions.ScalarSubquery =>
+        val sparkPlan = sqlContext.planner.plan(ReturnAnswer(subquery.query)).next()
+        val executedPlan = sqlContext.prepareForExecution.execute(sparkPlan)
+        ScalarSubquery(executedPlan, subquery.exprId)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index b3e1797..b4d6f4e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -21,18 +21,13 @@ import java.math.MathContext
 import java.sql.Timestamp
 
 import org.apache.spark.AccumulatorSuite
-import org.apache.spark.sql.catalyst.CatalystQl
-import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
-import org.apache.spark.sql.catalyst.parser.ParserConf
-import org.apache.spark.sql.execution.{aggregate, SparkQl}
+import org.apache.spark.sql.execution.aggregate
 import org.apache.spark.sql.execution.joins.{CartesianProduct, SortMergeJoin}
-import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext}
 import org.apache.spark.sql.test.SQLTestData._
 import org.apache.spark.sql.types._
 
-
 class SQLQuerySuite extends QueryTest with SharedSQLContext {
   import testImplicits._
 

http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
new file mode 100644
index 0000000..e851eb0
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.test.SharedSQLContext
+
+class SubquerySuite extends QueryTest with SharedSQLContext {
+
+  test("simple uncorrelated scalar subquery") {
+    assertResult(Array(Row(1))) {
+      sql("select (select 1 as b) as b").collect()
+    }
+
+    assertResult(Array(Row(1))) {
+      sql("with t2 as (select 1 as b, 2 as c) " +
+        "select a from (select 1 as a union all select 2 as a) t " +
+        "where a = (select max(b) from t2) ").collect()
+    }
+
+    assertResult(Array(Row(3))) {
+      sql("select (select (select 1) + 1) + 1").collect()
+    }
+
+    // more than one columns
+    val error = intercept[AnalysisException] {
+      sql("select (select 1, 2) as b").collect()
+    }
+    assert(error.message contains "Scalar subquery must return only one column, but got 2")
+
+    // more than one rows
+    val error2 = intercept[RuntimeException] {
+      sql("select (select a from (select 1 as a union all select 2 as a) t) as b").collect()
+    }
+    assert(error2.getMessage contains
+      "more than one row returned by a subquery used as an expression")
+
+    // string type
+    assertResult(Array(Row("s"))) {
+      sql("select (select 's' as s) as b").collect()
+    }
+
+    // zero rows
+    assertResult(Array(Row(null))) {
+      sql("select (select 's' as s limit 0) as b").collect()
+    }
+  }
+
+  test("uncorrelated scalar subquery on testData") {
+    // initialize test Data
+    testData
+
+    assertResult(Array(Row(5))) {
+      sql("select (select key from testData where key > 3 limit 1) + 1").collect()
+    }
+
+    assertResult(Array(Row(-100))) {
+      sql("select -(select max(key) from testData)").collect()
+    }
+
+    assertResult(Array(Row(null))) {
+      sql("select (select value from testData limit 0)").collect()
+    }
+
+    assertResult(Array(Row("99"))) {
+      sql("select (select min(value) from testData" +
+        " where key = (select max(key) from testData) - 1)").collect()
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org