You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/02/21 06:02:01 UTC
spark git commit: [SPARK-13306] [SQL] uncorrelated scalar subquery
Repository: spark
Updated Branches:
refs/heads/master f88c641bc -> 792507128
[SPARK-13306] [SQL] uncorrelated scalar subquery
A scalar subquery is a subquery that only generate single row and single column, could be used as part of expression. Uncorrelated scalar subquery means it does not has a reference to external table.
All the uncorrelated scalar subqueries will be executed during prepare() of SparkPlan.
The plans for query
```sql
select 1 + (select 2 + (select 3))
```
looks like this
```
== Parsed Logical Plan ==
'Project [unresolvedalias((1 + subquery#1),None)]
:- OneRowRelation$
+- 'Subquery subquery#1
+- 'Project [unresolvedalias((2 + subquery#0),None)]
:- OneRowRelation$
+- 'Subquery subquery#0
+- 'Project [unresolvedalias(3,None)]
+- OneRowRelation$
== Analyzed Logical Plan ==
_c0: int
Project [(1 + subquery#1) AS _c0#4]
:- OneRowRelation$
+- Subquery subquery#1
+- Project [(2 + subquery#0) AS _c0#3]
:- OneRowRelation$
+- Subquery subquery#0
+- Project [3 AS _c0#2]
+- OneRowRelation$
== Optimized Logical Plan ==
Project [(1 + subquery#1) AS _c0#4]
:- OneRowRelation$
+- Subquery subquery#1
+- Project [(2 + subquery#0) AS _c0#3]
:- OneRowRelation$
+- Subquery subquery#0
+- Project [3 AS _c0#2]
+- OneRowRelation$
== Physical Plan ==
WholeStageCodegen
: +- Project [(1 + subquery#1) AS _c0#4]
: :- INPUT
: +- Subquery subquery#1
: +- WholeStageCodegen
: : +- Project [(2 + subquery#0) AS _c0#3]
: : :- INPUT
: : +- Subquery subquery#0
: : +- WholeStageCodegen
: : : +- Project [3 AS _c0#2]
: : : +- INPUT
: : +- Scan OneRowRelation[]
: +- Scan OneRowRelation[]
+- Scan OneRowRelation[]
```
Author: Davies Liu <da...@databricks.com>
Closes #11190 from davies/scalar_subquery.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79250712
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79250712
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79250712
Branch: refs/heads/master
Commit: 7925071280bfa1570435bde3e93492eaf2167d56
Parents: f88c641
Author: Davies Liu <da...@databricks.com>
Authored: Sat Feb 20 21:01:51 2016 -0800
Committer: Davies Liu <da...@gmail.com>
Committed: Sat Feb 20 21:01:51 2016 -0800
----------------------------------------------------------------------
.../sql/catalyst/parser/ExpressionParser.g | 2 +
.../apache/spark/sql/catalyst/CatalystQl.scala | 2 +
.../spark/sql/catalyst/analysis/Analyzer.scala | 32 ++++++++
.../sql/catalyst/expressions/subquery.scala | 82 +++++++++++++++++++
.../sql/catalyst/optimizer/Optimizer.scala | 14 +++-
.../spark/sql/catalyst/plans/QueryPlan.scala | 6 ++
.../spark/sql/catalyst/trees/TreeNode.scala | 11 ++-
.../spark/sql/catalyst/CatalystQlSuite.scala | 7 ++
.../catalyst/analysis/AnalysisErrorSuite.scala | 11 +++
.../scala/org/apache/spark/sql/SQLContext.scala | 1 +
.../apache/spark/sql/execution/SparkPlan.scala | 50 ++++++++++++
.../spark/sql/execution/WholeStageCodegen.scala | 5 +-
.../spark/sql/execution/basicOperators.scala | 15 ++++
.../apache/spark/sql/execution/subquery.scala | 74 +++++++++++++++++
.../org/apache/spark/sql/SQLQuerySuite.scala | 7 +-
.../org/apache/spark/sql/SubquerySuite.scala | 84 ++++++++++++++++++++
16 files changed, 391 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g
index c162c1a..10f2e24 100644
--- a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g
+++ b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g
@@ -205,6 +205,8 @@ atomExpression
| whenExpression
| (functionName LPAREN) => function
| tableOrColumn
+ | (LPAREN KW_SELECT) => subQueryExpression
+ -> ^(TOK_SUBQUERY_EXPR ^(TOK_SUBQUERY_OP) subQueryExpression)
| LPAREN! expression RPAREN!
;
http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala
index 8099751..b16025a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala
@@ -667,6 +667,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
UnresolvedAttribute(nameParts :+ cleanIdentifier(attr))
case other => UnresolvedExtractValue(other, Literal(cleanIdentifier(attr)))
}
+ case Token("TOK_SUBQUERY_EXPR", Token("TOK_SUBQUERY_OP", Nil) :: subquery :: Nil) =>
+ ScalarSubquery(nodeToPlan(subquery))
/* Stars (*) */
case Token("TOK_ALLCOLREF", Nil) => UnresolvedStar(None)
http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 54b91ad..ba306f8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -80,6 +80,7 @@ class Analyzer(
ResolveGenerate ::
ResolveFunctions ::
ResolveAliases ::
+ ResolveSubquery ::
ResolveWindowOrder ::
ResolveWindowFrame ::
ResolveNaturalJoin ::
@@ -120,7 +121,14 @@ class Analyzer(
withAlias.getOrElse(relation)
}
substituted.getOrElse(u)
+ case other =>
+ // This can't be done in ResolveSubquery because that does not know the CTE.
+ other transformExpressions {
+ case e: SubqueryExpression =>
+ e.withNewPlan(substituteCTE(e.query, cteRelations))
+ }
}
+
}
}
@@ -717,6 +725,30 @@ class Analyzer(
}
/**
+ * This rule resolve subqueries inside expressions.
+ *
+ * Note: CTE are handled in CTESubstitution.
+ */
+ object ResolveSubquery extends Rule[LogicalPlan] with PredicateHelper {
+
+ private def hasSubquery(e: Expression): Boolean = {
+ e.find(_.isInstanceOf[SubqueryExpression]).isDefined
+ }
+
+ private def hasSubquery(q: LogicalPlan): Boolean = {
+ q.expressions.exists(hasSubquery)
+ }
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ case q: LogicalPlan if q.childrenResolved && hasSubquery(q) =>
+ q transformExpressions {
+ case e: SubqueryExpression if !e.query.resolved =>
+ e.withNewPlan(execute(e.query))
+ }
+ }
+ }
+
+ /**
* Turns projections that contain aggregate expressions into aggregations.
*/
object GlobalAggregates extends Rule[LogicalPlan] {
http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
new file mode 100644
index 0000000..a8f5e1f
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
+import org.apache.spark.sql.types.DataType
+
+/**
+ * An interface for subquery that is used in expressions.
+ */
+abstract class SubqueryExpression extends LeafExpression {
+
+ /**
+ * The logical plan of the query.
+ */
+ def query: LogicalPlan
+
+ /**
+ * Either a logical plan or a physical plan. The generated tree string (explain output) uses this
+ * field to explain the subquery.
+ */
+ def plan: QueryPlan[_]
+
+ /**
+ * Updates the query with new logical plan.
+ */
+ def withNewPlan(plan: LogicalPlan): SubqueryExpression
+}
+
+/**
+ * A subquery that will return only one row and one column.
+ *
+ * This will be converted into [[execution.ScalarSubquery]] during physical planning.
+ *
+ * Note: `exprId` is used to have unique name in explain string output.
+ */
+case class ScalarSubquery(
+ query: LogicalPlan,
+ exprId: ExprId = NamedExpression.newExprId)
+ extends SubqueryExpression with Unevaluable {
+
+ override def plan: LogicalPlan = Subquery(toString, query)
+
+ override lazy val resolved: Boolean = query.resolved
+
+ override def dataType: DataType = query.schema.fields.head.dataType
+
+ override def checkInputDataTypes(): TypeCheckResult = {
+ if (query.schema.length != 1) {
+ TypeCheckResult.TypeCheckFailure("Scalar subquery must return only one column, but got " +
+ query.schema.length.toString)
+ } else {
+ TypeCheckResult.TypeCheckSuccess
+ }
+ }
+
+ override def foldable: Boolean = false
+ override def nullable: Boolean = true
+
+ override def withNewPlan(plan: LogicalPlan): ScalarSubquery = ScalarSubquery(plan, exprId)
+
+ override def toString: String = s"subquery#${exprId.id}"
+
+ // TODO: support sql()
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index b7d8d93..202f6b5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -90,7 +90,19 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
Batch("Decimal Optimizations", FixedPoint(100),
DecimalAggregates) ::
Batch("LocalRelation", FixedPoint(100),
- ConvertToLocalRelation) :: Nil
+ ConvertToLocalRelation) ::
+ Batch("Subquery", Once,
+ OptimizeSubqueries) :: Nil
+ }
+
+ /**
+ * Optimize all the subqueries inside expression.
+ */
+ object OptimizeSubqueries extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+ case subquery: SubqueryExpression =>
+ subquery.withNewPlan(Optimizer.this.execute(subquery.query))
+ }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 05f5bdb..86bd33f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.plans
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.Subquery
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.types.{DataType, StructType}
@@ -226,4 +227,9 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
protected def statePrefix = if (missingInput.nonEmpty && children.nonEmpty) "!" else ""
override def simpleString: String = statePrefix + super.simpleString
+
+ override def treeChildren: Seq[PlanType] = {
+ val subqueries = expressions.flatMap(_.collect {case e: SubqueryExpression => e})
+ children ++ subqueries.map(e => e.plan.asInstanceOf[PlanType])
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 30df2a8..e46ce1c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -449,6 +449,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
}
/**
+ * All the nodes that will be used to generate tree string.
+ */
+ protected def treeChildren: Seq[BaseType] = children
+
+ /**
* Appends the string represent of this node and its children to the given StringBuilder.
*
* The `i`-th element in `lastChildren` indicates whether the ancestor of the current node at
@@ -470,9 +475,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
builder.append(simpleString)
builder.append("\n")
- if (children.nonEmpty) {
- children.init.foreach(_.generateTreeString(depth + 1, lastChildren :+ false, builder))
- children.last.generateTreeString(depth + 1, lastChildren :+ true, builder)
+ if (treeChildren.nonEmpty) {
+ treeChildren.init.foreach(_.generateTreeString(depth + 1, lastChildren :+ false, builder))
+ treeChildren.last.generateTreeString(depth + 1, lastChildren :+ true, builder)
}
builder
http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala
index 8d7d6b5..ed71218 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types.BooleanType
import org.apache.spark.unsafe.types.CalendarInterval
class CatalystQlSuite extends PlanTest {
@@ -201,4 +202,10 @@ class CatalystQlSuite extends PlanTest {
parser.parsePlan("select sum(product + 1) over (partition by (product + (1)) order by 2) " +
"from windowData")
}
+
+ test("subquery") {
+ parser.parsePlan("select (select max(b) from s) ss from t")
+ parser.parsePlan("select * from t where a = (select b from s)")
+ parser.parsePlan("select * from t group by g having a > (select b from s)")
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index a46006a..69f78a0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -114,6 +114,17 @@ class AnalysisErrorSuite extends AnalysisTest {
val dateLit = Literal.create(null, DateType)
errorTest(
+ "scalar subquery with 2 columns",
+ testRelation.select(
+ (ScalarSubquery(testRelation.select('a, dateLit.as('b))) + Literal(1)).as('a)),
+ "Scalar subquery must return only one column, but got 2" :: Nil)
+
+ errorTest(
+ "scalar subquery with no column",
+ testRelation.select(ScalarSubquery(LocalRelation()).as('a)),
+ "Scalar subquery must return only one column, but got 0" :: Nil)
+
+ errorTest(
"single invalid type, single arg",
testRelation.select(TestFunction(dateLit :: Nil, IntegerType :: Nil).as('a)),
"cannot resolve" :: "testfunction" :: "argument 1" :: "requires int type" ::
http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index c7d1096..932df36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -884,6 +884,7 @@ class SQLContext private[sql](
@transient
protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
val batches = Seq(
+ Batch("Subquery", Once, PlanSubqueries(self)),
Batch("Add exchange", Once, EnsureRequirements(self)),
Batch("Whole stage codegen", Once, CollapseCodegenStages(self))
)
http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index c72b8dc..872ccde 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.execution
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.duration._
import org.apache.spark.Logging
import org.apache.spark.rdd.{RDD, RDDOperationScope}
@@ -31,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetric}
import org.apache.spark.sql.types.DataType
+import org.apache.spark.util.ThreadUtils
/**
* The base class for physical operators.
@@ -112,16 +115,58 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
final def execute(): RDD[InternalRow] = {
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
prepare()
+ waitForSubqueries()
doExecute()
}
}
+ // All the subqueries and their Future of results.
+ @transient private val queryResults = ArrayBuffer[(ScalarSubquery, Future[Array[InternalRow]])]()
+
+ /**
+ * Collects all the subqueries and create a Future to take the first two rows of them.
+ */
+ protected def prepareSubqueries(): Unit = {
+ val allSubqueries = expressions.flatMap(_.collect {case e: ScalarSubquery => e})
+ allSubqueries.asInstanceOf[Seq[ScalarSubquery]].foreach { e =>
+ val futureResult = Future {
+ // We only need the first row, try to take two rows so we can throw an exception if there
+ // are more than one rows returned.
+ e.executedPlan.executeTake(2)
+ }(SparkPlan.subqueryExecutionContext)
+ queryResults += e -> futureResult
+ }
+ }
+
+ /**
+ * Waits for all the subqueries to finish and updates the results.
+ */
+ protected def waitForSubqueries(): Unit = {
+ // fill in the result of subqueries
+ queryResults.foreach {
+ case (e, futureResult) =>
+ val rows = Await.result(futureResult, Duration.Inf)
+ if (rows.length > 1) {
+ sys.error(s"more than one row returned by a subquery used as an expression:\n${e.plan}")
+ }
+ if (rows.length == 1) {
+ assert(rows(0).numFields == 1, "Analyzer should make sure this only returns one column")
+ e.updateResult(rows(0).get(0, e.dataType))
+ } else {
+ // There is no rows returned, the result should be null.
+ e.updateResult(null)
+ }
+ }
+ queryResults.clear()
+ }
+
/**
* Prepare a SparkPlan for execution. It's idempotent.
*/
final def prepare(): Unit = {
if (prepareCalled.compareAndSet(false, true)) {
doPrepare()
+ prepareSubqueries()
children.foreach(_.prepare())
}
}
@@ -231,6 +276,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
}
}
+object SparkPlan {
+ private[execution] val subqueryExecutionContext = ExecutionContext.fromExecutorService(
+ ThreadUtils.newDaemonCachedThreadPool("subquery", 16))
+}
+
private[sql] trait LeafNode extends SparkPlan {
override def children: Seq[SparkPlan] = Nil
override def producedAttributes: AttributeSet = outputSet
http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 8626f54..ad8564e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -73,9 +73,10 @@ trait CodegenSupport extends SparkPlan {
/**
* Returns Java source code to process the rows from upstream.
*/
- def produce(ctx: CodegenContext, parent: CodegenSupport): String = {
+ final def produce(ctx: CodegenContext, parent: CodegenSupport): String = {
this.parent = parent
ctx.freshNamePrefix = variablePrefix
+ waitForSubqueries()
doProduce(ctx)
}
@@ -101,7 +102,7 @@ trait CodegenSupport extends SparkPlan {
/**
* Consume the columns generated from current SparkPlan, call it's parent.
*/
- def consume(ctx: CodegenContext, input: Seq[ExprCode], row: String = null): String = {
+ final def consume(ctx: CodegenContext, input: Seq[ExprCode], row: String = null): String = {
if (input != null) {
assert(input.length == output.length)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 4b82d55..55bddd1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -343,3 +343,18 @@ case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPl
protected override def doExecute(): RDD[InternalRow] = child.execute()
}
+
+/**
+ * A plan as subquery.
+ *
+ * This is used to generate tree string for SparkScalarSubquery.
+ */
+case class Subquery(name: String, child: SparkPlan) extends UnaryNode {
+ override def output: Seq[Attribute] = child.output
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+ override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ throw new UnsupportedOperationException
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
new file mode 100644
index 0000000..9c645c7
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.{expressions, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.{ExprId, Literal, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.types.DataType
+
+/**
+ * A subquery that will return only one row and one column.
+ *
+ * This is the physical copy of ScalarSubquery to be used inside SparkPlan.
+ */
+case class ScalarSubquery(
+ @transient executedPlan: SparkPlan,
+ exprId: ExprId)
+ extends SubqueryExpression {
+
+ override def query: LogicalPlan = throw new UnsupportedOperationException
+ override def withNewPlan(plan: LogicalPlan): SubqueryExpression = {
+ throw new UnsupportedOperationException
+ }
+ override def plan: SparkPlan = Subquery(simpleString, executedPlan)
+
+ override def dataType: DataType = executedPlan.schema.fields.head.dataType
+ override def nullable: Boolean = true
+ override def toString: String = s"subquery#${exprId.id}"
+
+ // the first column in first row from `query`.
+ private var result: Any = null
+
+ def updateResult(v: Any): Unit = {
+ result = v
+ }
+
+ override def eval(input: InternalRow): Any = result
+
+ override def genCode(ctx: CodegenContext, ev: ExprCode): String = {
+ Literal.create(result, dataType).genCode(ctx, ev)
+ }
+}
+
+/**
+ * Convert the subquery from logical plan into executed plan.
+ */
+private[sql] case class PlanSubqueries(sqlContext: SQLContext) extends Rule[SparkPlan] {
+ def apply(plan: SparkPlan): SparkPlan = {
+ plan.transformAllExpressions {
+ case subquery: expressions.ScalarSubquery =>
+ val sparkPlan = sqlContext.planner.plan(ReturnAnswer(subquery.query)).next()
+ val executedPlan = sqlContext.prepareForExecution.execute(sparkPlan)
+ ScalarSubquery(executedPlan, subquery.exprId)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index b3e1797..b4d6f4e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -21,18 +21,13 @@ import java.math.MathContext
import java.sql.Timestamp
import org.apache.spark.AccumulatorSuite
-import org.apache.spark.sql.catalyst.CatalystQl
-import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
-import org.apache.spark.sql.catalyst.parser.ParserConf
-import org.apache.spark.sql.execution.{aggregate, SparkQl}
+import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.joins.{CartesianProduct, SortMergeJoin}
-import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext}
import org.apache.spark.sql.test.SQLTestData._
import org.apache.spark.sql.types._
-
class SQLQuerySuite extends QueryTest with SharedSQLContext {
import testImplicits._
http://git-wip-us.apache.org/repos/asf/spark/blob/79250712/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
new file mode 100644
index 0000000..e851eb0
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.test.SharedSQLContext
+
+class SubquerySuite extends QueryTest with SharedSQLContext {
+
+ test("simple uncorrelated scalar subquery") {
+ assertResult(Array(Row(1))) {
+ sql("select (select 1 as b) as b").collect()
+ }
+
+ assertResult(Array(Row(1))) {
+ sql("with t2 as (select 1 as b, 2 as c) " +
+ "select a from (select 1 as a union all select 2 as a) t " +
+ "where a = (select max(b) from t2) ").collect()
+ }
+
+ assertResult(Array(Row(3))) {
+ sql("select (select (select 1) + 1) + 1").collect()
+ }
+
+ // more than one columns
+ val error = intercept[AnalysisException] {
+ sql("select (select 1, 2) as b").collect()
+ }
+ assert(error.message contains "Scalar subquery must return only one column, but got 2")
+
+ // more than one rows
+ val error2 = intercept[RuntimeException] {
+ sql("select (select a from (select 1 as a union all select 2 as a) t) as b").collect()
+ }
+ assert(error2.getMessage contains
+ "more than one row returned by a subquery used as an expression")
+
+ // string type
+ assertResult(Array(Row("s"))) {
+ sql("select (select 's' as s) as b").collect()
+ }
+
+ // zero rows
+ assertResult(Array(Row(null))) {
+ sql("select (select 's' as s limit 0) as b").collect()
+ }
+ }
+
+ test("uncorrelated scalar subquery on testData") {
+ // initialize test Data
+ testData
+
+ assertResult(Array(Row(5))) {
+ sql("select (select key from testData where key > 3 limit 1) + 1").collect()
+ }
+
+ assertResult(Array(Row(-100))) {
+ sql("select -(select max(key) from testData)").collect()
+ }
+
+ assertResult(Array(Row(null))) {
+ sql("select (select value from testData limit 0)").collect()
+ }
+
+ assertResult(Array(Row("99"))) {
+ sql("select (select min(value) from testData" +
+ " where key = (select max(key) from testData) - 1)").collect()
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org